summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 01:37:15 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-17 16:23:37 -0800
commitaad278f9dd299e276cb643eb31d984ef9121f36d (patch)
tree7b7edf67ddd79f9ef63b399c05adb776790e5e12
parent7bd6b5da6d402565f25fce9e710be26b2d4cc125 (diff)
downloadkafka-python-dont-use-broker-errors-for-client-side-problems.tar.gz
Stop using broker-errors for client-side problemsdont-use-broker-errors-for-client-side-problems
`UnsupportedVersionError` is intended to indicate a server-side error: https://github.com/dpkp/kafka-python/blob/ba7372e44ffa1ee49fb4d5efbd67534393e944db/kafka/errors.py#L375-L378 So we should not be raising it for client-side errors. I realize that semantically this seems like the appropriate error to raise. However, this is confusing when debugging... for a real-life example, see https://github.com/Parsely/pykafka/issues/697. So I strongly feel that server-side errors should be kept separate from client-side errors, even if all the client is doing is proactively protecting against hitting a situation where the broker would return this error.
-rw-r--r--kafka/admin/kafka.py76
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/errors.py4
3 files changed, 44 insertions, 38 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index fbbbcc2..01db6a9 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -5,7 +5,8 @@ import logging
import socket
from kafka.client_async import KafkaClient, selectors
from kafka.errors import (
- KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
+ IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
+ NodeNotReadyError, NotControllerError)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -25,9 +26,11 @@ class KafkaAdmin(object):
nicer, more pythonic objects. Unfortunately, this will likely break
those interfaces.
- The KafkaAdmin class will negotiate for the latest version of each message protocol format supported
- by both the kafka-python client library and the kafka broker. Usage of optional fields from protocol
- versions that are not supported by the broker will result in UnsupportedVersionError exceptions.
+ The KafkaAdmin class will negotiate for the latest version of each message
+ protocol format supported by both the kafka-python client library and the
+ kafka broker. Usage of optional fields from protocol versions that are not
+ supported by the broker will result in IncompatibleBrokerVersion exceptions.
+
Use of this class requires a minimum broker version >= 0.10.0.0.
@@ -223,8 +226,8 @@ class KafkaAdmin(object):
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
# max library version is less than min broker version. Not sure any brokers
# actually set a min version greater than 0 right now, tho. But maybe in the future?
- raise UnsupportedVersionError(
- "Could not find matching protocol version for {}"
+ raise IncompatibleBrokerVersion(
+ "No version of the '{}' kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version
@@ -246,9 +249,9 @@ class KafkaAdmin(object):
self._controller_id = response.controller_id
version = self._client.check_version(self._controller_id)
if version < (0, 10, 0):
- raise UnsupportedVersionError(
- "Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
- .format(version))
+ raise IncompatibleBrokerVersion(
+ "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
+ .format(version))
def _send_request_to_node(self, node, request):
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
@@ -311,9 +314,9 @@ class KafkaAdmin(object):
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
if validate_only:
- raise UnsupportedVersionError(
- "validate_only not supported on cluster version {}"
- .format(self.config['api_version']))
+ raise IncompatibleBrokerVersion(
+ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
+ .format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout = timeout_ms
@@ -326,10 +329,9 @@ class KafkaAdmin(object):
validate_only = validate_only
)
else:
- raise UnsupportedVersionError(
- "missing implementation of CreateTopics for library supported version {}"
- .format(version)
- )
+ raise NotImplementedError(
+ "Support for CreateTopics v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
def delete_topics(self, topics, timeout_ms=None):
@@ -347,9 +349,9 @@ class KafkaAdmin(object):
timeout = timeout_ms
)
else:
- raise UnsupportedVersionError(
- "missing implementation of DeleteTopics for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
# list topics functionality is in ClusterMetadata
@@ -386,9 +388,9 @@ class KafkaAdmin(object):
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
- raise UnsupportedVersionError(
- "include_synonyms not supported on cluster version {}"
- .format(self.config['api_version']))
+ raise IncompatibleBrokerVersion(
+ "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
+ .format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
@@ -399,9 +401,9 @@ class KafkaAdmin(object):
include_synonyms = include_synonyms
)
else:
- raise UnsupportedVersionError(
- "missing implementation of DescribeConfigs for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
@staticmethod
@@ -426,9 +428,9 @@ class KafkaAdmin(object):
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
- raise UnsupportedVersionError(
- "missing implementation of AlterConfigs for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
# alter replica logs dir protocol not implemented
@@ -463,9 +465,9 @@ class KafkaAdmin(object):
validate_only = validate_only
)
else:
- raise UnsupportedVersionError(
- "missing implementation of CreatePartitions for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
# delete records protocol not implemented
@@ -490,9 +492,9 @@ class KafkaAdmin(object):
groups = group_ids
)
else:
- raise UnsupportedVersionError(
- "missing implementation of DescribeGroups for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
def list_consumer_groups(self):
@@ -504,9 +506,9 @@ class KafkaAdmin(object):
if version <= 1:
request = ListGroupsRequest[version]()
else:
- raise UnsupportedVersionError(
- "missing implementation of ListGroups for library supported version {}"
- .format(version))
+ raise NotImplementedError(
+ "Support for ListGroups v{} has not yet been added to KafkaAdmin."
+ .format(version))
return self._send(request)
# delete groups protocol not implemented
diff --git a/kafka/conn.py b/kafka/conn.py
index 5ec9757..471bae7 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -881,7 +881,7 @@ class BrokerConnection(object):
.format(version))
# _api_versions is set as a side effect of check_versions() on a cluster
# that supports 0.10.0 or later
- return self._api_versions;
+ return self._api_versions
def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
diff --git a/kafka/errors.py b/kafka/errors.py
index fb9576c..118e430 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError):
pass
+class IncompatibleBrokerVersion(KafkaError):
+ pass
+
+
class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
super(CommitFailedError, self).__init__(