summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-07-09 08:04:39 -0700
committerGitHub <noreply@github.com>2017-07-09 08:04:39 -0700
commitd0c6b1f95c2e677545d1faaeae525e8768abea9e (patch)
tree4c889cb6be669e36e4992a74ece3b8e7038d0c80
parent2f75169504c8bd6f31ab4a88823a8073eb57eced (diff)
downloadkafka-python-d0c6b1f95c2e677545d1faaeae525e8768abea9e.tar.gz
Protocol updates for 0.11.0.0 (#1127)
-rw-r--r--kafka/protocol/admin.py128
-rw-r--r--kafka/protocol/commit.py95
-rw-r--r--kafka/protocol/fetch.py94
-rw-r--r--kafka/protocol/group.py93
-rw-r--r--kafka/protocol/metadata.py63
-rw-r--r--kafka/protocol/offset.py39
-rw-r--r--kafka/protocol/produce.py37
7 files changed, 508 insertions, 41 deletions
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index c5142b3..09746bf 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -16,6 +16,19 @@ class ApiVersionResponse_v0(Response):
)
+class ApiVersionResponse_v1(Response):
+ API_KEY = 18
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('api_versions', Array(
+ ('api_key', Int16),
+ ('min_version', Int16),
+ ('max_version', Int16))),
+ ('throttle_time_ms', Int32)
+ )
+
+
class ApiVersionRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
@@ -23,8 +36,15 @@ class ApiVersionRequest_v0(Request):
SCHEMA = Schema()
-ApiVersionRequest = [ApiVersionRequest_v0]
-ApiVersionResponse = [ApiVersionResponse_v0]
+class ApiVersionRequest_v1(Request):
+ API_KEY = 18
+ API_VERSION = 1
+ RESPONSE_TYPE = ApiVersionResponse_v1
+ SCHEMA = ApiVersionRequest_v0.SCHEMA
+
+
+ApiVersionRequest = [ApiVersionRequest_v0, ApiVersionRequest_v1]
+ApiVersionResponse = [ApiVersionResponse_v0, ApiVersionResponse_v1]
class CreateTopicsResponse_v0(Response):
@@ -48,6 +68,18 @@ class CreateTopicsResponse_v1(Response):
)
+class CreateTopicsResponse_v2(Response):
+ API_KEY = 19
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topic_error_codes', Array(
+ ('topic', String('utf-8')),
+ ('error_code', Int16),
+ ('error_message', String('utf-8'))))
+ )
+
+
class CreateTopicsRequest_v0(Request):
API_KEY = 19
API_VERSION = 0
@@ -87,8 +119,19 @@ class CreateTopicsRequest_v1(Request):
)
-CreateTopicsRequest = [CreateTopicsRequest_v0, CreateTopicsRequest_v1]
-CreateTopicsResponse = [CreateTopicsResponse_v0, CreateTopicsRequest_v1]
+class CreateTopicsRequest_v2(Request):
+ API_KEY = 19
+ API_VERSION = 2
+ RESPONSE_TYPE = CreateTopicsResponse_v2
+ SCHEMA = CreateTopicsRequest_v1.SCHEMA
+
+
+CreateTopicsRequest = [
+ CreateTopicsRequest_v0, CreateTopicsRequest_v1, CreateTopicsRequest_v2
+]
+CreateTopicsResponse = [
+ CreateTopicsResponse_v0, CreateTopicsResponse_v1, CreateTopicsResponse_v2
+]
class DeleteTopicsResponse_v0(Response):
@@ -101,6 +144,17 @@ class DeleteTopicsResponse_v0(Response):
)
+class DeleteTopicsResponse_v1(Response):
+ API_KEY = 20
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topic_error_codes', Array(
+ ('topic', String('utf-8')),
+ ('error_code', Int16)))
+ )
+
+
class DeleteTopicsRequest_v0(Request):
API_KEY = 20
API_VERSION = 0
@@ -111,8 +165,15 @@ class DeleteTopicsRequest_v0(Request):
)
-DeleteTopicsRequest = [DeleteTopicsRequest_v0]
-DeleteTopicsResponse = [DeleteTopicsResponse_v0]
+class DeleteTopicsRequest_v1(Request):
+ API_KEY = 20
+ API_VERSION = 1
+ RESPONSE_TYPE = DeleteTopicsResponse_v1
+ SCHEMA = DeleteTopicsRequest_v0.SCHEMA
+
+
+DeleteTopicsRequest = [DeleteTopicsRequest_v0, DeleteTopicsRequest_v1]
+DeleteTopicsResponse = [DeleteTopicsResponse_v0, DeleteTopicsResponse_v1]
class ListGroupsResponse_v0(Response):
@@ -126,6 +187,18 @@ class ListGroupsResponse_v0(Response):
)
+class ListGroupsResponse_v1(Response):
+ API_KEY = 16
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16),
+ ('groups', Array(
+ ('group', String('utf-8')),
+ ('protocol_type', String('utf-8'))))
+ )
+
+
class ListGroupsRequest_v0(Request):
API_KEY = 16
API_VERSION = 0
@@ -133,8 +206,15 @@ class ListGroupsRequest_v0(Request):
SCHEMA = Schema()
-ListGroupsRequest = [ListGroupsRequest_v0]
-ListGroupsResponse = [ListGroupsResponse_v0]
+class ListGroupsRequest_v1(Request):
+ API_KEY = 16
+ API_VERSION = 1
+ RESPONSE_TYPE = ListGroupsResponse_v1
+ SCHEMA = ListGroupsRequest_v0.SCHEMA
+
+
+ListGroupsRequest = [ListGroupsRequest_v0, ListGroupsRequest_v1]
+ListGroupsResponse = [ListGroupsResponse_v0, ListGroupsResponse_v1]
class DescribeGroupsResponse_v0(Response):
@@ -156,6 +236,27 @@ class DescribeGroupsResponse_v0(Response):
)
+class DescribeGroupsResponse_v1(Response):
+ API_KEY = 15
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('groups', Array(
+ ('error_code', Int16),
+ ('group', String('utf-8')),
+ ('state', String('utf-8')),
+ ('protocol_type', String('utf-8')),
+ ('protocol', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('client_id', String('utf-8')),
+ ('client_host', String('utf-8')),
+ ('member_metadata', Bytes),
+ ('member_assignment', Bytes)))))
+ )
+
+
+
class DescribeGroupsRequest_v0(Request):
API_KEY = 15
API_VERSION = 0
@@ -165,8 +266,15 @@ class DescribeGroupsRequest_v0(Request):
)
-DescribeGroupsRequest = [DescribeGroupsRequest_v0]
-DescribeGroupsResponse = [DescribeGroupsResponse_v0]
+class DescribeGroupsRequest_v1(Request):
+ API_KEY = 15
+ API_VERSION = 1
+ RESPONSE_TYPE = DescribeGroupsResponse_v1
+ SCHEMA = DescribeGroupsRequest_v0.SCHEMA
+
+
+DescribeGroupsRequest = [DescribeGroupsRequest_v0, DescribeGroupsRequest_v1]
+DescribeGroupsResponse = [DescribeGroupsResponse_v0, DescribeGroupsResponse_v1]
class SaslHandShakeResponse_v0(Response):
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
index bcffe67..9d744c7 100644
--- a/kafka/protocol/commit.py
+++ b/kafka/protocol/commit.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .types import Array, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class OffsetCommitResponse_v0(Response):
@@ -28,6 +28,19 @@ class OffsetCommitResponse_v2(Response):
SCHEMA = OffsetCommitResponse_v1.SCHEMA
+class OffsetCommitResponse_v3(Response):
+ API_KEY = 8
+ API_VERSION = 3
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
class OffsetCommitRequest_v0(Request):
API_KEY = 8
API_VERSION = 0 # Zookeeper-backed storage
@@ -81,10 +94,21 @@ class OffsetCommitRequest_v2(Request):
DEFAULT_RETENTION_TIME = -1
-OffsetCommitRequest = [OffsetCommitRequest_v0, OffsetCommitRequest_v1,
- OffsetCommitRequest_v2]
-OffsetCommitResponse = [OffsetCommitResponse_v0, OffsetCommitResponse_v1,
- OffsetCommitResponse_v2]
+class OffsetCommitRequest_v3(Request):
+ API_KEY = 8
+ API_VERSION = 3
+ RESPONSE_TYPE = OffsetCommitResponse_v3
+ SCHEMA = OffsetCommitRequest_v2.SCHEMA
+
+
+OffsetCommitRequest = [
+ OffsetCommitRequest_v0, OffsetCommitRequest_v1,
+ OffsetCommitRequest_v2, OffsetCommitRequest_v3
+]
+OffsetCommitResponse = [
+ OffsetCommitResponse_v0, OffsetCommitResponse_v1,
+ OffsetCommitResponse_v2, OffsetCommitResponse_v3
+]
class OffsetFetchResponse_v0(Response):
@@ -123,6 +147,22 @@ class OffsetFetchResponse_v2(Response):
)
+class OffsetFetchResponse_v3(Response):
+ API_KEY = 9
+ API_VERSION = 3
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8')),
+ ('error_code', Int16))))),
+ ('error_code', Int16)
+ )
+
+
class OffsetFetchRequest_v0(Request):
API_KEY = 9
API_VERSION = 0 # zookeeper-backed storage
@@ -152,10 +192,21 @@ class OffsetFetchRequest_v2(Request):
SCHEMA = OffsetFetchRequest_v1.SCHEMA
-OffsetFetchRequest = [OffsetFetchRequest_v0, OffsetFetchRequest_v1,
- OffsetFetchRequest_v2]
-OffsetFetchResponse = [OffsetFetchResponse_v0, OffsetFetchResponse_v1,
- OffsetFetchResponse_v2]
+class OffsetFetchRequest_v3(Request):
+ API_KEY = 9
+ API_VERSION = 3
+ RESPONSE_TYPE = OffsetFetchResponse_v3
+ SCHEMA = OffsetFetchRequest_v2.SCHEMA
+
+
+OffsetFetchRequest = [
+ OffsetFetchRequest_v0, OffsetFetchRequest_v1,
+ OffsetFetchRequest_v2, OffsetFetchRequest_v3,
+]
+OffsetFetchResponse = [
+ OffsetFetchResponse_v0, OffsetFetchResponse_v1,
+ OffsetFetchResponse_v2, OffsetFetchResponse_v3,
+]
class GroupCoordinatorResponse_v0(Response):
@@ -169,6 +220,18 @@ class GroupCoordinatorResponse_v0(Response):
)
+class GroupCoordinatorResponse_v1(Response):
+ API_KEY = 10
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('error_message', String('utf-8')),
+ ('coordinator_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32)
+ )
+
+
class GroupCoordinatorRequest_v0(Request):
API_KEY = 10
API_VERSION = 0
@@ -178,5 +241,15 @@ class GroupCoordinatorRequest_v0(Request):
)
-GroupCoordinatorRequest = [GroupCoordinatorRequest_v0]
-GroupCoordinatorResponse = [GroupCoordinatorResponse_v0]
+class GroupCoordinatorRequest_v1(Request):
+ API_KEY = 10
+ API_VERSION = 1
+ RESPONSE_TYPE = GroupCoordinatorResponse_v1
+ SCHEMA = Schema(
+ ('coordinator_key', String('utf-8')),
+ ('coordinator_type', Int8)
+ )
+
+
+GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]
+GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1]
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index b441e63..359f197 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
from .api import Request, Response
from .message import MessageSet
-from .types import Array, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class FetchResponse_v0(Response):
@@ -46,6 +46,45 @@ class FetchResponse_v3(Response):
SCHEMA = FetchResponse_v2.SCHEMA
+class FetchResponse_v4(Response):
+ API_KEY = 1
+ API_VERSION = 4
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('message_set', MessageSet)))))
+ )
+
+
+class FetchResponse_v5(Response):
+ API_KEY = 1
+ API_VERSION = 5
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('log_start_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('message_set', MessageSet)))))
+ )
+
+
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
@@ -95,7 +134,52 @@ class FetchRequest_v3(Request):
)
-FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
- FetchRequest_v3]
-FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
- FetchResponse_v3]
+class FetchRequest_v4(Request):
+ # Adds isolation_level field
+ API_KEY = 1
+ API_VERSION = 4
+ RESPONSE_TYPE = FetchResponse_v4
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('max_bytes', Int32)))))
+ )
+
+
+class FetchRequest_v5(Request):
+ # This may only be used in broker-broker api calls
+ API_KEY = 1
+ API_VERSION = 5
+ RESPONSE_TYPE = FetchResponse_v5
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('fetch_offset', Int64),
+ ('log_start_offset', Int64),
+ ('max_bytes', Int32)))))
+ )
+
+
+FetchRequest = [
+ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
+ FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
+]
+FetchResponse = [
+ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
+ FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
+]
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
index 5cab754..ce75a5f 100644
--- a/kafka/protocol/group.py
+++ b/kafka/protocol/group.py
@@ -26,6 +26,22 @@ class JoinGroupResponse_v1(Response):
SCHEMA = JoinGroupResponse_v0.SCHEMA
+class JoinGroupResponse_v2(Response):
+ API_KEY = 11
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16),
+ ('generation_id', Int32),
+ ('group_protocol', String('utf-8')),
+ ('leader_id', String('utf-8')),
+ ('member_id', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('member_metadata', Bytes)))
+ )
+
+
class JoinGroupRequest_v0(Request):
API_KEY = 11
API_VERSION = 0
@@ -59,8 +75,20 @@ class JoinGroupRequest_v1(Request):
UNKNOWN_MEMBER_ID = ''
-JoinGroupRequest = [JoinGroupRequest_v0, JoinGroupRequest_v1]
-JoinGroupResponse = [JoinGroupResponse_v0, JoinGroupResponse_v1]
+class JoinGroupRequest_v2(Request):
+ API_KEY = 11
+ API_VERSION = 2
+ RESPONSE_TYPE = JoinGroupResponse_v2
+ SCHEMA = JoinGroupRequest_v1.SCHEMA
+ UNKNOWN_MEMBER_ID = ''
+
+
+JoinGroupRequest = [
+ JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
+]
+JoinGroupResponse = [
+ JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1
+]
class ProtocolMetadata(Struct):
@@ -80,6 +108,16 @@ class SyncGroupResponse_v0(Response):
)
+class SyncGroupResponse_v1(Response):
+ API_KEY = 14
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16),
+ ('member_assignment', Bytes)
+ )
+
+
class SyncGroupRequest_v0(Request):
API_KEY = 14
API_VERSION = 0
@@ -94,8 +132,15 @@ class SyncGroupRequest_v0(Request):
)
-SyncGroupRequest = [SyncGroupRequest_v0]
-SyncGroupResponse = [SyncGroupResponse_v0]
+class SyncGroupRequest_v1(Request):
+ API_KEY = 14
+ API_VERSION = 1
+ RESPONSE_TYPE = SyncGroupResponse_v1
+ SCHEMA = SyncGroupRequest_v0.SCHEMA
+
+
+SyncGroupRequest = [SyncGroupRequest_v0, SyncGroupRequest_v1]
+SyncGroupResponse = [SyncGroupResponse_v0, SyncGroupResponse_v1]
class MemberAssignment(Struct):
@@ -116,6 +161,15 @@ class HeartbeatResponse_v0(Response):
)
+class HeartbeatResponse_v1(Response):
+ API_KEY = 12
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16)
+ )
+
+
class HeartbeatRequest_v0(Request):
API_KEY = 12
API_VERSION = 0
@@ -127,8 +181,15 @@ class HeartbeatRequest_v0(Request):
)
-HeartbeatRequest = [HeartbeatRequest_v0]
-HeartbeatResponse = [HeartbeatResponse_v0]
+class HeartbeatRequest_v1(Request):
+ API_KEY = 12
+ API_VERSION = 1
+ RESPONSE_TYPE = HeartbeatResponse_v1
+ SCHEMA = HeartbeatRequest_v0
+
+
+HeartbeatRequest = [HeartbeatRequest_v0, HeartbeatRequest_v1]
+HeartbeatResponse = [HeartbeatResponse_v0, HeartbeatResponse_v1]
class LeaveGroupResponse_v0(Response):
@@ -139,6 +200,15 @@ class LeaveGroupResponse_v0(Response):
)
+class LeaveGroupResponse_v1(Response):
+ API_KEY = 13
+ API_VERSION = 1
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16)
+ )
+
+
class LeaveGroupRequest_v0(Request):
API_KEY = 13
API_VERSION = 0
@@ -149,5 +219,12 @@ class LeaveGroupRequest_v0(Request):
)
-LeaveGroupRequest = [LeaveGroupRequest_v0]
-LeaveGroupResponse = [LeaveGroupResponse_v0]
+class LeaveGroupRequest_v1(Request):
+ API_KEY = 13
+ API_VERSION = 1
+ RESPONSE_TYPE = LeaveGroupResponse_v1
+ SCHEMA = LeaveGroupRequest_v0.SCHEMA
+
+
+LeaveGroupRequest = [LeaveGroupRequest_v0, LeaveGroupRequest_v1]
+LeaveGroupResponse = [LeaveGroupResponse_v0, LeaveGroupResponse_v1]
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
index 907ec25..2be8209 100644
--- a/kafka/protocol/metadata.py
+++ b/kafka/protocol/metadata.py
@@ -71,6 +71,37 @@ class MetadataResponse_v2(Response):
)
+class MetadataResponse_v3(Response):
+ API_KEY = 3
+ API_VERSION = 3
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32),
+ ('rack', String('utf-8')))),
+ ('cluster_id', String('utf-8')),
+ ('controller_id', Int32),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('is_internal', Boolean),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32))))))
+ )
+
+
+class MetadataResponse_v4(Response):
+ API_KEY = 3
+ API_VERSION = 4
+ SCHEMA = MetadataResponse_v3.SCHEMA
+
+
class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
@@ -95,8 +126,36 @@ class MetadataRequest_v2(Request):
API_VERSION = 2
RESPONSE_TYPE = MetadataResponse_v2
SCHEMA = MetadataRequest_v1.SCHEMA
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+
+
+class MetadataRequest_v3(Request):
+ API_KEY = 3
+ API_VERSION = 3
+ RESPONSE_TYPE = MetadataResponse_v3
+ SCHEMA = MetadataRequest_v1.SCHEMA
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
+
+
+class MetadataRequest_v4(Request):
+ API_KEY = 3
+ API_VERSION = 4
+ RESPONSE_TYPE = MetadataResponse_v4
+ SCHEMA = Schema(
+ ('topics', Array(String('utf-8'))),
+ ('allow_auto_topic_creation', Boolean)
+ )
+ ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
+ NO_TOPICS = None # Empty array (len 0) for topics returns no topics
-MetadataRequest = [MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2]
+MetadataRequest = [
+ MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
+ MetadataRequest_v3, MetadataRequest_v4
+]
MetadataResponse = [
- MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2]
+ MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
+ MetadataResponse_v3, MetadataResponse_v4
+]
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 588dfec..8353f8c 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
from .api import Request, Response
-from .types import Array, Int16, Int32, Int64, Schema, String
+from .types import Array, Int8, Int16, Int32, Int64, Schema, String
class OffsetResetStrategy(object):
@@ -36,6 +36,21 @@ class OffsetResponse_v1(Response):
)
+class OffsetResponse_v2(Response):
+ API_KEY = 2
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('timestamp', Int64),
+ ('offset', Int64)))))
+ )
+
+
class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
@@ -70,5 +85,23 @@ class OffsetRequest_v1(Request):
}
-OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1]
-OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1]
+class OffsetRequest_v2(Request):
+ API_KEY = 2
+ API_VERSION = 2
+ RESPONSE_TYPE = OffsetResponse_v2
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('isolation_level', Int8),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('timestamp', Int64)))))
+ )
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2]
+OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2]
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index 9b03354..da1f308 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -47,6 +47,12 @@ class ProduceResponse_v2(Response):
)
+class ProduceResponse_v3(Response):
+ API_KEY = 0
+ API_VERSION = 3
+ SCHEMA = ProduceResponse_v2.SCHEMA
+
+
class ProduceRequest_v0(Request):
API_KEY = 0
API_VERSION = 0
@@ -91,5 +97,32 @@ class ProduceRequest_v2(Request):
return True
-ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
-ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]
+class ProduceRequest_v3(Request):
+ API_KEY = 0
+ API_VERSION = 3
+ RESPONSE_TYPE = ProduceResponse_v3
+ SCHEMA = Schema(
+ ('transactional_id', String('utf-8')),
+ ('required_acks', Int16),
+ ('timeout', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('messages', MessageSet)))))
+ )
+
+ def expect_response(self):
+ if self.required_acks == 0: # pylint: disable=no-member
+ return False
+ return True
+
+
+ProduceRequest = [
+ ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
+ ProduceRequest_v3
+]
+ProduceResponse = [
+ ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
+ ProduceResponse_v2
+]