summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-17 04:11:52 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 16:47:38 -0800
commit24f41315889c23a5ea7d0ae26c3bbd8d68ae062c (patch)
tree99ffaa146edd2449680c408a63161b614d998752
parent8dab14b6d73d8f1717fdeb46c79807827169fd2d (diff)
downloadkafka-python-24f41315889c23a5ea7d0ae26c3bbd8d68ae062c.tar.gz
Various docstring / pep8 / code hygiene cleanups
-rw-r--r--kafka/admin/kafka.py157
1 files changed, 86 insertions, 71 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index ca5ad56..8e0a756 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -21,10 +21,12 @@ from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.version import __version__
+
log = logging.getLogger(__name__)
+
class KafkaAdmin(object):
- """An class for administering the kafka cluster.
+ """A class for administering the Kafka cluster.
Warning:
This is an unstable interface that was recently added and is subject to
@@ -35,10 +37,9 @@ class KafkaAdmin(object):
The KafkaAdmin class will negotiate for the latest version of each message
protocol format supported by both the kafka-python client library and the
- kafka broker. Usage of optional fields from protocol versions that are not
+ Kafka broker. Usage of optional fields from protocol versions that are not
supported by the broker will result in IncompatibleBrokerVersion exceptions.
-
Use of this class requires a minimum broker version >= 0.10.0.0.
Keyword Arguments:
@@ -167,16 +168,16 @@ class KafkaAdmin(object):
'sasl_kerberos_service_name': 'kafka',
# metrics configs
- 'metric_reporters' : [],
+ 'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
}
def __init__(self, **configs):
- log.debug("Starting Kafka administration interface")
+ log.debug("Starting KafkaAdmin interface.")
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
- raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
+ raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
@@ -189,8 +190,9 @@ class KafkaAdmin(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='admin',
- **self.config)
+ self._client = KafkaClient(metrics=self._metrics,
+ metric_group_prefix='admin',
+ **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
@@ -198,46 +200,49 @@ class KafkaAdmin(object):
self._closed = False
self._refresh_controller_id()
- log.debug('Kafka administration interface started')
+ log.debug("KafkaAdmin interface started.")
def close(self):
- """Close the administration connection to the kafka broker"""
+ """Close the KafkaAdmin connection to the Kafka broker."""
if not hasattr(self, '_closed') or self._closed:
- log.info('Kafka administration interface already closed')
+ log.info("KafkaAdmin interface already closed.")
return
self._metrics.close()
self._client.close()
self._closed = True
- log.debug('Kafka administration interface has closed')
+ log.debug("KafkaAdmin interface has closed.")
def _matching_api_version(self, operation):
- """Find matching api version, the lesser of either the latest api version the library supports, or
- the max version supported by the broker
+ """Find the latest version of the protocol operation supported by both
+ this library and the broker.
+
+ This resolves to the lesser of either the latest api version this
+ library supports, or the max version supported by the broker.
- :param operation: An operation array from kafka.protocol
- :return: The max matching version number between client and broker
+ :param operation: A list of protocol operation versions from kafka.protocol.
+ :return: The max matching version number between client and broker.
"""
version = min(len(operation) - 1,
self._client.get_api_versions()[operation[0].API_KEY][1])
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
- # max library version is less than min broker version. Not sure any brokers
- # actually set a min version greater than 0 right now, tho. But maybe in the future?
+ # max library version is less than min broker version. Currently,
+ # no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
- "No version of the '{}' kafka protocol is supported by both the client and broker."
+ "No version of the '{}' Kafka protocol is supported by both the client and broker."
.format(operation.__name__))
return version
def _validate_timeout(self, timeout_ms):
- """Validate the timeout is set or use the configuration default
+ """Validate the timeout is set or use the configuration default.
- :param timeout_ms: The timeout provided by api call, in milliseconds
- :return: The timeout to use for the operation
+ :param timeout_ms: The timeout provided by api call, in milliseconds.
+ :return: The timeout to use for the operation.
"""
return timeout_ms or self.config['request_timeout_ms']
def _refresh_controller_id(self):
- """Determine the kafka cluster controller."""
+ """Determine the Kafka cluster controller."""
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
@@ -293,31 +298,34 @@ class KafkaAdmin(object):
assert group_coordinator != -1
return group_coordinator
- def _send_request_to_node(self, node, request):
- """Send a kafka protocol message to a specific broker. Will block until the message result is received.
+ def _send_request_to_node(self, node_id, request):
+ """Send a Kafka protocol message to a specific broker.
- :param node: The broker id to which to send the message
- :param request: The message to send
- :return: The kafka protocol response for the message
- :exception: The exception if the message could not be sent
+ Will block until the message result is received.
+
+ :param node_id: The broker id to which to send the message.
+ :param request: The message to send.
+ :return: The Kafka protocol response for the message.
+ :exception: The exception if the message could not be sent.
"""
- while not self._client.ready(node):
- # connection to broker not ready, poll until it is or send will fail with NodeNotReadyError
+ while not self._client.ready(node_id):
+ # poll until the connection to broker is ready, otherwise send()
+ # will fail with NodeNotReadyError
self._client.poll()
- future = self._client.send(node, request)
+ future = self._client.send(node_id, request)
self._client.poll(future=future)
if future.succeeded():
return future.value
else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
def _send_request_to_controller(self, request):
- """Send a kafka protocol message to the cluster controller.
+ """Send a Kafka protocol message to the cluster controller.
Will block until the message result is received.
- :param request: The message to send
- :return: The kafka protocol response for the message
+ :param request: The message to send.
+ :return: The Kafka protocol response for the message.
"""
tries = 2 # in case our cached self._controller_id is outdated
while tries:
@@ -357,11 +365,12 @@ class KafkaAdmin(object):
def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
"""Create new topics in the cluster.
- :param new_topics: Array of NewTopic objects
- :param timeout_ms: Milliseconds to wait for new topics to be created before broker returns
+ :param new_topics: A list of NewTopic objects.
+ :param timeout_ms: Milliseconds to wait for new topics to be created
+ before the broker returns.
:param validate_only: If True, don't actually create new topics.
Not supported by all versions. Default: False
- :return: Appropriate version of CreateTopicResponse class
+ :return: Appropriate version of CreateTopicResponse class.
"""
version = self._matching_api_version(CreateTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
@@ -371,40 +380,44 @@ class KafkaAdmin(object):
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
- create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
- timeout = timeout_ms
+ create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
+ timeout=timeout_ms
)
elif version <= 2:
request = CreateTopicsRequest[version](
- create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
- timeout = timeout_ms,
- validate_only = validate_only
+ create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
+ timeout=timeout_ms,
+ validate_only=validate_only
)
else:
raise NotImplementedError(
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
.format(version))
+ # TODO convert structs to a more pythonic interface
+ # TODO raise exceptions if errors
return self._send_request_to_controller(request)
def delete_topics(self, topics, timeout_ms=None):
- """Delete topics from the cluster
+ """Delete topics from the cluster.
- :param topics: Array of topic name strings
- :param timeout_ms: Milliseconds to wait for topics to be deleted before broker returns
- :return: Appropriate version of DeleteTopicsResponse class
+ :param topics: A list of topic name strings.
+ :param timeout_ms: Milliseconds to wait for topics to be deleted
+ before the broker returns.
+ :return: Appropriate version of DeleteTopicsResponse class.
"""
version = self._matching_api_version(DeleteTopicsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version <= 1:
request = DeleteTopicsRequest[version](
- topics = topics,
- timeout = timeout_ms
+ topics=topics,
+ timeout=timeout_ms
)
+ response = self._send_request_to_controller(request)
else:
raise NotImplementedError(
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
.format(version))
- return self._send_request_to_controller(request)
+ return response
# list topics functionality is in ClusterMetadata
# Note: if implemented here, send the request to the least_loaded_node()
@@ -435,14 +448,15 @@ class KafkaAdmin(object):
)
def describe_configs(self, config_resources, include_synonyms=False):
- """Fetch configuration parameters for one or more kafka resources.
+ """Fetch configuration parameters for one or more Kafka resources.
- :param config_resources: An array of ConfigResource objects.
- Any keys in ConfigResource.configs dict will be used to filter the result. The configs dict should be None
- to get all values. An empty dict will get zero values (as per kafka protocol).
- :param include_synonyms: If True, return synonyms in response. Not
+ :param config_resources: An list of ConfigResource objects.
+ Any keys in ConfigResource.configs dict will be used to filter the
+ result. Setting the configs dict to None will get all values. An
+ empty dict will get zero values (as per Kafka protocol).
+ :param include_synonyms: If True, return synonyms in response. Not
supported by all versions. Default: False.
- :return: Appropriate version of DescribeConfigsResponse class
+ :return: Appropriate version of DescribeConfigsResponse class.
"""
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
@@ -451,12 +465,12 @@ class KafkaAdmin(object):
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
- resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
+ resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
- elif version <= 1:
+ elif version == 1:
request = DescribeConfigsRequest[version](
- resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
- include_synonyms = include_synonyms
+ resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
+ include_synonyms=include_synonyms
)
else:
raise NotImplementedError(
@@ -475,7 +489,7 @@ class KafkaAdmin(object):
)
def alter_configs(self, config_resources):
- """Alter configuration parameters of one or more kafka resources.
+ """Alter configuration parameters of one or more Kafka resources.
Warning:
This is currently broken for BROKER resources because those must be
@@ -483,13 +497,13 @@ class KafkaAdmin(object):
least-loaded node. See the comment in the source code for details.
We would happily accept a PR fixing this.
- :param config_resources: An array of ConfigResource objects.
- :return: Appropriate version of AlterConfigsResponse class
+ :param config_resources: A list of ConfigResource objects.
+ :return: Appropriate version of AlterConfigsResponse class.
"""
version = self._matching_api_version(AlterConfigsRequest)
if version == 0:
request = AlterConfigsRequest[version](
- resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
+ resources=[self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
)
else:
raise NotImplementedError(
@@ -522,19 +536,20 @@ class KafkaAdmin(object):
def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False):
"""Create additional partitions for an existing topic.
- :param topic_partitions: A map of topic name strings to NewPartition objects
- :param timeout_ms: Milliseconds to wait for new partitions to be created before broker returns
+ :param topic_partitions: A map of topic name strings to NewPartition objects.
+ :param timeout_ms: Milliseconds to wait for new partitions to be
+ created before the broker returns.
:param validate_only: If True, don't actually create new partitions.
Default: False
- :return: Appropriate version of CreatePartitionsResponse class
+ :return: Appropriate version of CreatePartitionsResponse class.
"""
version = self._matching_api_version(CreatePartitionsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version == 0:
request = CreatePartitionsRequest[version](
- topic_partitions = [self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
- timeout = timeout_ms,
- validate_only = validate_only
+ topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
+ timeout=timeout_ms,
+ validate_only=validate_only
)
else:
raise NotImplementedError(