summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 02:04:28 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 15:16:19 -0800
commit50690884e74d1cf1075d96bca0c028bc4d8e1e60 (patch)
treec9360bd988ae54a3be085f4a87109bab1ac1c6da
parentac1a2a0a1012909faba4e711b968e5b0c3746ca5 (diff)
downloadkafka-python-50690884e74d1cf1075d96bca0c028bc4d8e1e60.tar.gz
Fix send to controller
The controller send error handling was completely broken. It also pinned the metadata version unnecessarily. Additionally, several of the methods were sending to the controller but either that was unnecessary, or just plain wrong. So updated following the pattern of the Java Admin client.
-rw-r--r--kafka/admin/kafka.py133
1 files changed, 89 insertions, 44 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 3dc2e44..befdd86 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -6,8 +6,8 @@ import socket
from kafka.client_async import KafkaClient, selectors
import kafka.errors as Errors
from kafka.errors import (
- IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
- NodeNotReadyError, NotControllerError)
+ IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
+ UnrecognizedBrokerVersion)
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -232,17 +232,22 @@ class KafkaAdmin(object):
return timeout_ms or self.config['request_timeout_ms']
def _refresh_controller_id(self):
- """Determine the kafka cluster controller
- """
- response = self._send_request_to_node(
- self._client.least_loaded_node(),
- MetadataRequest[1]([])
- )
- self._controller_id = response.controller_id
- version = self._client.check_version(self._controller_id)
- if version < (0, 10, 0):
- raise IncompatibleBrokerVersion(
- "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
+ """Determine the kafka cluster controller."""
+ version = self._matching_api_version(MetadataRequest)
+ if 1 <= version <= 6:
+ request = MetadataRequest[version]()
+ response = self._send_request_to_node(self._client.least_loaded_node(), request)
+ controller_id = response.controller_id
+ # verify the controller is new enough to support our requests
+ controller_version = self._client.check_version(controller_id)
+ if controller_version < (0, 10, 0):
+ raise IncompatibleBrokerVersion(
+ "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
+ .format(controller_version))
+ self._controller_id = controller_id
+ else:
+ raise UnrecognizedBrokerVersion(
+ "Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))
def _find_group_coordinator_id(self, group_id):
@@ -301,22 +306,34 @@ class KafkaAdmin(object):
else:
raise future.exception # pylint: disable-msg=raising-bad-type
- def _send(self, request):
- """Send a kafka protocol message to the cluster controller. Will block until the message result is received.
+ def _send_request_to_controller(self, request):
+ """Send a kafka protocol message to the cluster controller.
+
+ Will block until the message result is received.
:param request: The message to send
- :return The kafka protocol response for the message
- :exception NodeNotReadyError: If the controller connection can't be established
+ :return: The kafka protocol response for the message
"""
- remaining_tries = 2
- while remaining_tries > 0:
- remaining_tries = remaining_tries - 1
- try:
- return self._send_request_to_node(self._controller_id, request)
- except (NotControllerError, KafkaConnectionError) as e:
- # controller changed? refresh it
- self._refresh_controller_id()
- raise NodeNotReadyError(self._controller_id)
+ tries = 2 # in case our cached self._controller_id is outdated
+ while tries:
+ tries -= 1
+ response = self._send_request_to_node(self._controller_id, request)
+ # DeleteTopicsResponse returns topic_error_codes rather than topic_errors
+ for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
+ error_type = Errors.for_code(error_code)
+ if tries and isinstance(error_type, NotControllerError):
+ # No need to inspect the rest of the errors for
+ # non-retriable errors because NotControllerError should
+ # either be thrown for all errors or no errors.
+ self._refresh_controller_id()
+ break
+ elif error_type is not Errors.NoError:
+ raise error_type(
+ "Request '{}' failed with response '{}'."
+ .format(request, response))
+ else:
+ return response
+ raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
@staticmethod
def _convert_new_topic_request(new_topic):
@@ -362,7 +379,7 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster
@@ -382,19 +399,25 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
# list topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
# describe topics functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the controller
# describe cluster functionality is in ClusterMetadata
+ # Note: if implemented here, send the request to the least_loaded_node()
- # describe_acls protocol not implemented
+ # describe_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # create_acls protocol not implemented
+ # create_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # delete_acls protocol not implemented
+ # delete_acls protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
@staticmethod
def _convert_describe_config_resource_request(config_resource):
@@ -434,7 +457,7 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
@staticmethod
def _convert_alter_config_resource_request(config_resource):
@@ -449,6 +472,12 @@ class KafkaAdmin(object):
def alter_configs(self, config_resources):
"""Alter configuration parameters of one or more kafka resources.
+ Warning:
+ This is currently broken for BROKER resources because those must be
+ sent to that specific broker, versus this always picks the
+ least-loaded node. See the comment in the source code for details.
+ We would happily accept a PR fixing this.
+
:param config_resources: An array of ConfigResource objects.
:return: Appropriate version of AlterConfigsResponse class
"""
@@ -461,11 +490,19 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO the Java client has the note:
+ # // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+ # // and send the request to that specific broker. Other resources are grouped together into
+ # // a single request that may be sent to any broker.
+ #
+ # So this is currently broken as it always sends to the least_loaded_node()
+ return self._send_request_to_node(self._client.least_loaded_node(), request)
- # alter replica logs dir protocol not implemented
+ # alter replica logs dir protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
- # describe log dirs protocol not implemented
+ # describe log dirs protocol not yet implemented
+ # Note: have to lookup the broker with the replica assignment and send the request to that broker
@staticmethod
def _convert_create_partitions_request(topic_name, new_partitions):
@@ -498,17 +535,22 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ return self._send_request_to_controller(request)
- # delete records protocol not implemented
+ # delete records protocol not yet implemented
+ # Note: send the request to the partition leaders
- # create delegation token protocol not implemented
+ # create delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # renew delegation token protocol not implemented
+ # renew delegation token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # expire delegation_token protocol not implemented
+ # expire delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
- # describe delegation_token protocol not implemented
+ # describe delegation_token protocol not yet implemented
+ # Note: send the request to the least_loaded_node()
def describe_consumer_groups(self, group_ids):
"""Describe a set of consumer groups.
@@ -525,7 +567,8 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO this is completely broken, as it needs to send to the group coordinator
+ # return self._send(request)
def list_consumer_groups(self):
"""List all consumer groups known to the cluster.
@@ -539,6 +582,8 @@ class KafkaAdmin(object):
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send(request)
+ # TODO this is completely broken, as it needs to send to the group coordinator
+ # return self._send(request)
- # delete groups protocol not implemented
+ # delete groups protocol not yet implemented
+ # Note: send the request to the group's coordinator.