summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTincu Gabriel <gabi@aiven.io>2020-05-05 13:29:23 +0200
committerGitHub <noreply@github.com>2020-05-05 14:29:23 +0300
commit6fc008137c75c751a9fbea3e0ef36d2870119c7b (patch)
tree667c4cb56dd57c819e8a73387c6689383a5ac564
parentf9e0264e0b0f8d92afb6177d51976795e3bdbcd8 (diff)
downloadkafka-python-6fc008137c75c751a9fbea3e0ef36d2870119c7b.tar.gz
Add logic for inferring newer broker versions (#2038)
* Add logic for inferring newer broker versions - New Fetch / ListOffsets request / response objects - Add new test cases to inferr code based on mentioned objects - Add unit test to check inferred version against whatever resides in KAFKA_VERSION - Add new kafka broker versions to integration list - Add more kafka broker versions to travis task list - Add support for broker version 2.5 id * Implement PR change requests: fewer versions for travis testing, remove unused older versions for inference code, remove one minor version from known server list Do not use newly created ACL request / responses in allowed version lists, due to flexible versions enabling in kafka actually requiring a serialization protocol header update Revert admin client file change
-rw-r--r--.travis.yml1
-rwxr-xr-xbuild_integration.sh6
-rw-r--r--kafka/conn.py12
-rw-r--r--kafka/protocol/admin.py20
-rw-r--r--kafka/protocol/fetch.py182
-rw-r--r--kafka/protocol/offset.py89
-rw-r--r--servers/2.5.0/resources/kafka.properties (renamed from servers/2.2.0/resources/kafka.properties)0
-rw-r--r--servers/2.5.0/resources/kafka_server_jaas.conf (renamed from servers/2.2.0/resources/kafka_server_jaas.conf)0
-rw-r--r--servers/2.5.0/resources/log4j.properties (renamed from servers/2.2.0/resources/log4j.properties)0
-rw-r--r--servers/2.5.0/resources/zookeeper.properties (renamed from servers/2.2.0/resources/zookeeper.properties)0
-rw-r--r--test/test_consumer_integration.py15
11 files changed, 315 insertions, 10 deletions
diff --git a/.travis.yml b/.travis.yml
index 8e2fdfe..b98aa16 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,6 +15,7 @@ env:
- KAFKA_VERSION=0.11.0.3
- KAFKA_VERSION=1.1.1
- KAFKA_VERSION=2.4.0
+ - KAFKA_VERSION=2.5.0
addons:
apt:
diff --git a/build_integration.sh b/build_integration.sh
index 98b9b27..c020b0f 100755
--- a/build_integration.sh
+++ b/build_integration.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
+: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
: ${SCALA_VERSION:=2.11}
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
@@ -33,12 +33,14 @@ pushd servers
echo "-------------------------------------"
echo "Checking kafka binaries for ${kafka}"
echo
- # kafka 0.8.0 is only available w/ scala 2.8.0
if [ "$kafka" == "0.8.0" ]; then
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
+ else if [ "$kafka" \> "2.4.0" ]; then
+ KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
else
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
fi
+ fi
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
if [ -f "${KAFKA_ARTIFACT}" ]; then
echo "Using cached artifact: ${KAFKA_ARTIFACT}"
diff --git a/kafka/conn.py b/kafka/conn.py
index c383123..5c72875 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -24,9 +24,12 @@ import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
-from kafka.protocol.admin import SaslHandShakeRequest
+from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2
from kafka.protocol.commit import OffsetFetchRequest
+from kafka.protocol.offset import OffsetRequest
+from kafka.protocol.produce import ProduceRequest
from kafka.protocol.metadata import MetadataRequest
+from kafka.protocol.fetch import FetchRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32, Int8
from kafka.scram import ScramClient
@@ -1166,6 +1169,13 @@ class BrokerConnection(object):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
+ ((2, 5, 0), DescribeAclsRequest_v2),
+ ((2, 4, 0), ProduceRequest[8]),
+ ((2, 3, 0), FetchRequest[11]),
+ ((2, 2, 0), OffsetRequest[5]),
+ ((2, 1, 0), FetchRequest[10]),
+ ((2, 0, 0), FetchRequest[8]),
+ ((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
index b2694dc..af88ea4 100644
--- a/kafka/protocol/admin.py
+++ b/kafka/protocol/admin.py
@@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response):
('permission_type', Int8)))))
)
+
+class DescribeAclsResponse_v2(Response):
+ API_KEY = 29
+ API_VERSION = 2
+ SCHEMA = DescribeAclsResponse_v1.SCHEMA
+
+
class DescribeAclsRequest_v0(Request):
API_KEY = 29
API_VERSION = 0
@@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request):
('permission_type', Int8)
)
+
class DescribeAclsRequest_v1(Request):
API_KEY = 29
API_VERSION = 1
@@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request):
('permission_type', Int8)
)
+
+class DescribeAclsRequest_v2(Request):
+ """
+ Enable flexible version
+ """
+ API_KEY = 29
+ API_VERSION = 2
+ RESPONSE_TYPE = DescribeAclsResponse_v2
+ SCHEMA = DescribeAclsRequest_v1.SCHEMA
+
+
DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1]
DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1]
@@ -862,3 +881,4 @@ CreatePartitionsRequest = [
CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]
+
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index dd3f648..f367848 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -94,6 +94,72 @@ class FetchResponse_v6(Response):
SCHEMA = FetchResponse_v5.SCHEMA
+class FetchResponse_v7(Response):
+ """
+ Add error_code and session_id to response
+ """
+ API_KEY = 1
+ API_VERSION = 7
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16),
+ ('session_id', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('log_start_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('message_set', Bytes)))))
+ )
+
+
+class FetchResponse_v8(Response):
+ API_KEY = 1
+ API_VERSION = 8
+ SCHEMA = FetchResponse_v7.SCHEMA
+
+
+class FetchResponse_v9(Response):
+ API_KEY = 1
+ API_VERSION = 9
+ SCHEMA = FetchResponse_v7.SCHEMA
+
+
+class FetchResponse_v10(Response):
+ API_KEY = 1
+ API_VERSION = 10
+ SCHEMA = FetchResponse_v7.SCHEMA
+
+
+class FetchResponse_v11(Response):
+ API_KEY = 1
+ API_VERSION = 11
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('error_code', Int16),
+ ('session_id', Int32),
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('last_stable_offset', Int64),
+ ('log_start_offset', Int64),
+ ('aborted_transactions', Array(
+ ('producer_id', Int64),
+ ('first_offset', Int64))),
+ ('preferred_read_replica', Int32),
+ ('message_set', Bytes)))))
+ )
+
+
class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
@@ -196,13 +262,125 @@ class FetchRequest_v6(Request):
SCHEMA = FetchRequest_v5.SCHEMA
+class FetchRequest_v7(Request):
+ """
+ Add incremental fetch requests
+ """
+ API_KEY = 1
+ API_VERSION = 7
+ RESPONSE_TYPE = FetchResponse_v7
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('session_id', Int32),
+ ('session_epoch', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('fetch_offset', Int64),
+ ('log_start_offset', Int64),
+ ('max_bytes', Int32))))),
+ ('forgotten_topics_data', Array(
+ ('topic', String),
+ ('partitions', Array(Int32))
+ )),
+ )
+
+
+class FetchRequest_v8(Request):
+ """
+ bump used to indicate that on quota violation brokers send out responses before throttling.
+ """
+ API_KEY = 1
+ API_VERSION = 8
+ RESPONSE_TYPE = FetchResponse_v8
+ SCHEMA = FetchRequest_v7.SCHEMA
+
+
+class FetchRequest_v9(Request):
+ """
+ adds the current leader epoch (see KIP-320)
+ """
+ API_KEY = 1
+ API_VERSION = 9
+ RESPONSE_TYPE = FetchResponse_v9
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('session_id', Int32),
+ ('session_epoch', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('current_leader_epoch', Int32),
+ ('fetch_offset', Int64),
+ ('log_start_offset', Int64),
+ ('max_bytes', Int32))))),
+ ('forgotten_topics_data', Array(
+ ('topic', String),
+ ('partitions', Array(Int32)),
+ )),
+ )
+
+
+class FetchRequest_v10(Request):
+ """
+ bumped up to indicate ZStandard capability. (see KIP-110)
+ """
+ API_KEY = 1
+ API_VERSION = 10
+ RESPONSE_TYPE = FetchResponse_v10
+ SCHEMA = FetchRequest_v9.SCHEMA
+
+
+class FetchRequest_v11(Request):
+ """
+ added rack ID to support read from followers (KIP-392)
+ """
+ API_KEY = 1
+ API_VERSION = 11
+ RESPONSE_TYPE = FetchResponse_v11
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('max_bytes', Int32),
+ ('isolation_level', Int8),
+ ('session_id', Int32),
+ ('session_epoch', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('current_leader_epoch', Int32),
+ ('fetch_offset', Int64),
+ ('log_start_offset', Int64),
+ ('max_bytes', Int32))))),
+ ('forgotten_topics_data', Array(
+ ('topic', String),
+ ('partitions', Array(Int32))
+ )),
+ ('rack_id', String('utf-8')),
+ )
+
+
FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
- FetchRequest_v6
+ FetchRequest_v6, FetchRequest_v7, FetchRequest_v8,
+ FetchRequest_v9, FetchRequest_v10, FetchRequest_v11,
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
- FetchResponse_v6
+ FetchResponse_v6, FetchResponse_v7, FetchResponse_v8,
+ FetchResponse_v9, FetchResponse_v10, FetchResponse_v11,
]
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
index 3c254de..1ed382b 100644
--- a/kafka/protocol/offset.py
+++ b/kafka/protocol/offset.py
@@ -53,6 +53,43 @@ class OffsetResponse_v2(Response):
)
+class OffsetResponse_v3(Response):
+ """
+ on quota violation, brokers send out responses before throttling
+ """
+ API_KEY = 2
+ API_VERSION = 3
+ SCHEMA = OffsetResponse_v2.SCHEMA
+
+
+class OffsetResponse_v4(Response):
+ """
+ Add leader_epoch to response
+ """
+ API_KEY = 2
+ API_VERSION = 4
+ SCHEMA = Schema(
+ ('throttle_time_ms', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('timestamp', Int64),
+ ('offset', Int64),
+ ('leader_epoch', Int32)))))
+ )
+
+
+class OffsetResponse_v5(Response):
+ """
+ adds a new error code, OFFSET_NOT_AVAILABLE
+ """
+ API_KEY = 2
+ API_VERSION = 5
+ SCHEMA = OffsetResponse_v4.SCHEMA
+
+
class OffsetRequest_v0(Request):
API_KEY = 2
API_VERSION = 0
@@ -105,5 +142,53 @@ class OffsetRequest_v2(Request):
}
-OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2]
-OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2]
+class OffsetRequest_v3(Request):
+ API_KEY = 2
+ API_VERSION = 3
+ RESPONSE_TYPE = OffsetResponse_v3
+ SCHEMA = OffsetRequest_v2.SCHEMA
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+class OffsetRequest_v4(Request):
+ """
+ Add current_leader_epoch to request
+ """
+ API_KEY = 2
+ API_VERSION = 4
+ RESPONSE_TYPE = OffsetResponse_v4
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('isolation_level', Int8), # <- added isolation_level
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('current_leader_epoch', Int64),
+ ('timestamp', Int64)))))
+ )
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+class OffsetRequest_v5(Request):
+ API_KEY = 2
+ API_VERSION = 5
+ RESPONSE_TYPE = OffsetResponse_v5
+ SCHEMA = OffsetRequest_v4.SCHEMA
+ DEFAULTS = {
+ 'replica_id': -1
+ }
+
+
+OffsetRequest = [
+ OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2,
+ OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5,
+]
+OffsetResponse = [
+ OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2,
+ OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5,
+]
diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.5.0/resources/kafka.properties
index 5775cfd..5775cfd 100644
--- a/servers/2.2.0/resources/kafka.properties
+++ b/servers/2.5.0/resources/kafka.properties
diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.5.0/resources/kafka_server_jaas.conf
index 18efe43..18efe43 100644
--- a/servers/2.2.0/resources/kafka_server_jaas.conf
+++ b/servers/2.5.0/resources/kafka_server_jaas.conf
diff --git a/servers/2.2.0/resources/log4j.properties b/servers/2.5.0/resources/log4j.properties
index b0b76aa..b0b76aa 100644
--- a/servers/2.2.0/resources/log4j.properties
+++ b/servers/2.5.0/resources/log4j.properties
diff --git a/servers/2.2.0/resources/zookeeper.properties b/servers/2.5.0/resources/zookeeper.properties
index e3fd097..e3fd097 100644
--- a/servers/2.2.0/resources/zookeeper.properties
+++ b/servers/2.5.0/resources/zookeeper.properties
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 6e6bc94..90b7ed2 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -6,15 +6,24 @@ import pytest
from kafka.vendor.six.moves import range
import kafka.codec
-from kafka.errors import (
- KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
-)
+from kafka.errors import UnsupportedCodecError, UnsupportedVersionError
from kafka.structs import TopicPartition, OffsetAndTimestamp
from test.testutil import Timer, assert_message_count, env_kafka_version, random_string
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_kafka_version_infer(kafka_consumer_factory):
+ consumer = kafka_consumer_factory()
+ actual_ver_major_minor = env_kafka_version()[:2]
+ client = consumer._client
+ conn = list(client._conns.values())[0]
+ inferred_ver_major_minor = conn.check_version()[:2]
+ assert actual_ver_major_minor == inferred_ver_major_minor, \
+ "Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor)
+
+
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(kafka_consumer_factory, send_messages):
"""Test KafkaConsumer"""
consumer = kafka_consumer_factory(auto_offset_reset='earliest')