summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py37
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py6
-rw-r--r--kafka/consumer/group.py16
-rw-r--r--kafka/errors.py17
5 files changed, 61 insertions, 17 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index e25afe7..5b1e03b 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -35,12 +35,23 @@ class KafkaAdminClient(object):
nicer, more pythonic objects. Unfortunately, this will likely break
those interfaces.
- The KafkaAdminClient class will negotiate for the latest version of each message
- protocol format supported by both the kafka-python client library and the
- Kafka broker. Usage of optional fields from protocol versions that are not
- supported by the broker will result in IncompatibleBrokerVersion exceptions.
-
- Use of this class requires a minimum broker version >= 0.10.0.0.
+ The KafkaAdminClient class will negotiate for the latest version of each
+ Kafka protocol message format supported by both the kafka-python client
+ library and the Kafka broker. Two different exceptions can be raised when a
+ method needs to use a Kafka protocol message format that is not supported
+ by the broker. An IncompatibleBrokerVersion exception means kafka-python
+ does not think the broker supports that message format and does not even
+ try to send it. An UnsupportedVersionError exception means that
+ kafka-python tried to send the message and the broker rejected it. If you
+ encounter UnsupportedVersionError, please file a bug, as we would prefer
+ to identify the error client-side and raise a IncompatibleBrokerVersion.
+
+ Note:
+ If the KafkaAdminClient's `api_version` is unset, then the broker
+ version is ascertained by querying a random broker in the cluster. So
+ if the cluster is running a mixture of old and new brokers, there is a
+ chance the version-checking will be incorrect. In that case, please pin
+ the `api_version` of the KafkaAdminClient.
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
@@ -252,7 +263,7 @@ class KafkaAdminClient(object):
controller_version = self._client.check_version(controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
- "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
+ "The controller appears to be running Kafka {}. KafkaAdminClient requires controllers >= 0.10.0.0."
.format(controller_version))
self._controller_id = controller_id
else:
@@ -591,6 +602,10 @@ class KafkaAdminClient(object):
partition assignments.
"""
group_descriptions = []
+ # TODO this can be used against brokers older than 0.10.0.0, so either
+ # need to fix BrokerConnection.get_api_versions() (and the docstring of
+ # KafkaClient.get_api_versions()), or add a bypass here when api_version is
+ # old.
version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
@@ -661,6 +676,10 @@ class KafkaAdminClient(object):
consumer_groups = set()
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
+ # TODO this can be used against brokers older than 0.10.0.0, so either
+ # need to fix BrokerConnection.get_api_versions() (and the docstring of
+ # KafkaClient.get_api_versions()), or add a bypass here when api_version is
+ # old.
version = self._matching_api_version(ListGroupsRequest)
if version <= 2:
request = ListGroupsRequest[version]()
@@ -707,6 +726,10 @@ class KafkaAdminClient(object):
group_offsets_listing = {}
if group_coordinator_id is None:
group_coordinator_id = self._find_group_coordinator_id(group_id)
+ # TODO this can be used against brokers older than 0.10.0.0, so either
+ # need to fix BrokerConnection.get_api_versions() (and the docstring of
+ # KafkaClient.get_api_versions()), or add a bypass here when api_version is
+ # old.
version = self._matching_api_version(OffsetFetchRequest)
if version <= 3:
if partitions is None:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b0d1f5e..566634b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -814,6 +814,8 @@ class KafkaClient(object):
def get_api_versions(self):
"""Return the ApiVersions map, if available.
+ # TODO if I update the BrokerConnection.get_api_versions(), also update
+ # this docstring
Note: A call to check_version must previously have succeeded and returned
version 0.10.0 or later
diff --git a/kafka/conn.py b/kafka/conn.py
index 471bae7..bc87a22 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -876,7 +876,11 @@ class BrokerConnection(object):
def get_api_versions(self):
version = self.check_version()
if version < (0, 10, 0):
- raise Errors.UnsupportedVersionError(
+ # TODO this currently blocks using various KafkaAdmin methods
+ # against older brokers that are supported, such as
+ # list_consumer_groups(), list_consumer_group_offsets(),
+ # describe_consumer_groups(), and _find_group_coordinator()
+ raise Errors.IncompatibleBrokerVersion(
"ApiVersion not supported by cluster version {} < 0.10.0"
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8727de7..5f75c48 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -5,7 +5,7 @@ import logging
import socket
import time
-from kafka.errors import KafkaConfigurationError, UnsupportedVersionError
+from kafka.errors import KafkaConfigurationError, IncompatibleBrokerVersion
from kafka.vendor import six
@@ -943,12 +943,12 @@ class KafkaConsumer(six.Iterator):
Raises:
ValueError: If the target timestamp is negative
- UnsupportedVersionError: If the broker does not support looking
- up the offsets by timestamp.
+ IncompatibleBrokerVersion: Raised by kafka-python when it does not
+ think the broker supports looking up the offsets by timestamp.
KafkaTimeoutError: If fetch failed in request_timeout_ms
"""
if self.config['api_version'] <= (0, 10, 0):
- raise UnsupportedVersionError(
+ raise IncompatibleBrokerVersion(
"offsets_for_times API not supported for cluster version {}"
.format(self.config['api_version']))
for tp, ts in six.iteritems(timestamps):
@@ -978,8 +978,8 @@ class KafkaConsumer(six.Iterator):
given partitions.
Raises:
- UnsupportedVersionError: If the broker does not support looking
- up the offsets by timestamp.
+ IncompatibleBrokerVersion: Raised by kafka-python when it does not
+ think the broker supports looking up the offsets by timestamp.
KafkaTimeoutError: If fetch failed in request_timeout_ms.
"""
offsets = self._fetcher.beginning_offsets(
@@ -1005,8 +1005,8 @@ class KafkaConsumer(six.Iterator):
``{TopicPartition: int}``: The end offsets for the given partitions.
Raises:
- UnsupportedVersionError: If the broker does not support looking
- up the offsets by timestamp.
+ IncompatibleBrokerVersion: Raised by kafka-python when it does not
+ think the broker supports looking up the offsets by timestamp.
KafkaTimeoutError: If fetch failed in request_timeout_ms
"""
offsets = self._fetcher.end_offsets(
diff --git a/kafka/errors.py b/kafka/errors.py
index 118e430..aac167c 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -59,10 +59,18 @@ class MetadataEmptyBrokerList(KafkaError):
class UnrecognizedBrokerVersion(KafkaError):
+ # Cannot determine the broker version
pass
class IncompatibleBrokerVersion(KafkaError):
+ # TODO convert this first comment to the default exception message... need to check message vs description is used when assembling the exception string
+ # kafka-python thinks the broker version is incompatible with this Kafka
+ # protocol message format and does not even try to send the message.
+
+ # See also UnsupportedBrokerVersion for the broker-side equivalent.
+ # Although the error is the same, we want to be able to differentiate
+ # between client-side and broker-side errors for easier debugging.
pass
@@ -377,9 +385,16 @@ class IllegalSaslStateError(BrokerResponseError):
class UnsupportedVersionError(BrokerResponseError):
+ # See also IncompatibleBrokerVersion for the client-side equivalent.
+ # Although the error is the same, we want to be able to differentiate
+ # between client-side and broker-side errors for easier debugging.
errno = 35
message = 'UNSUPPORTED_VERSION'
- description = 'The version of API is not supported.'
+ description = 'The version of API is not supported. If you encounter this '
+ 'error while using normal kafka-python methods and not
+ 'directly using the low-level protocol structs, please file '
+ 'an issue as we would prefer to identify the error '
+ 'client-side and raise an IncompatibleBrokerVersion.'
class TopicAlreadyExistsError(BrokerResponseError):