diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-20 09:03:50 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-11-20 09:24:50 -0800 |
commit | fcc800f96f14192c44b09d1d37108377dcaed245 (patch) | |
tree | 40ebfb2b5d6766ac21f74895652e412088e5f160 /kafka | |
parent | 45196e31d5cbd4da02a81f0c459faee1f8165306 (diff) | |
download | kafka-python-fcc800f96f14192c44b09d1d37108377dcaed245.tar.gz |
Rename KafkaAdmin to KafkaAdminClient
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 4 | ||||
-rw-r--r-- | kafka/admin/__init__.py | 4 | ||||
-rw-r--r-- | kafka/admin/client.py (renamed from kafka/admin/kafka.py) | 32 |
3 files changed, 20 insertions, 20 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index fa50bf6..cafa043 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -18,7 +18,7 @@ except ImportError: logging.getLogger(__name__).addHandler(NullHandler()) -from kafka.admin import KafkaAdmin +from kafka.admin import KafkaAdminClient from kafka.consumer import KafkaConsumer from kafka.consumer.subscription_state import ConsumerRebalanceListener from kafka.producer import KafkaProducer @@ -47,7 +47,7 @@ class KafkaClient(SimpleClient): __all__ = [ - 'KafkaAdmin', + 'KafkaAdminClient', 'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection', 'SimpleClient', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', diff --git a/kafka/admin/__init__.py b/kafka/admin/__init__.py index 069bc7c..a300301 100644 --- a/kafka/admin/__init__.py +++ b/kafka/admin/__init__.py @@ -1,10 +1,10 @@ from __future__ import absolute_import from kafka.admin.config_resource import ConfigResource, ConfigResourceType -from kafka.admin.kafka import KafkaAdmin +from kafka.admin.client import KafkaAdminClient from kafka.admin.new_topic import NewTopic from kafka.admin.new_partitions import NewPartitions __all__ = [ - 'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions' + 'ConfigResource', 'ConfigResourceType', 'KafkaAdminClient', 'NewTopic', 'NewPartitions' ] diff --git a/kafka/admin/kafka.py b/kafka/admin/client.py index c8abb4e..e25afe7 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/client.py @@ -25,7 +25,7 @@ from kafka.version import __version__ log = logging.getLogger(__name__) -class KafkaAdmin(object): +class KafkaAdminClient(object): """A class for administering the Kafka cluster. Warning: @@ -35,7 +35,7 @@ class KafkaAdmin(object): nicer, more pythonic objects. Unfortunately, this will likely break those interfaces. - The KafkaAdmin class will negotiate for the latest version of each message + The KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker. Usage of optional fields from protocol versions that are not supported by the broker will result in IncompatibleBrokerVersion exceptions. @@ -174,7 +174,7 @@ class KafkaAdmin(object): } def __init__(self, **configs): - log.debug("Starting KafkaAdmin interface.") + log.debug("Starting KafkaAdminClient with configuration: %s", configs) extra_configs = set(configs).difference(self.DEFAULT_CONFIG) if extra_configs: raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs)) @@ -200,18 +200,18 @@ class KafkaAdmin(object): self._closed = False self._refresh_controller_id() - log.debug("KafkaAdmin interface started.") + log.debug("KafkaAdminClient started.") def close(self): - """Close the KafkaAdmin connection to the Kafka broker.""" + """Close the KafkaAdminClient connection to the Kafka broker.""" if not hasattr(self, '_closed') or self._closed: - log.info("KafkaAdmin interface already closed.") + log.info("KafkaAdminClient already closed.") return self._metrics.close() self._client.close() self._closed = True - log.debug("KafkaAdmin interface has closed.") + log.debug("KafkaAdminClient is now closed.") def _matching_api_version(self, operation): """Find the latest version of the protocol operation supported by both @@ -252,7 +252,7 @@ class KafkaAdmin(object): controller_version = self._client.check_version(controller_id) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0." + "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." .format(controller_version)) self._controller_id = controller_id else: @@ -391,7 +391,7 @@ class KafkaAdmin(object): ) else: raise NotImplementedError( - "Support for CreateTopics v{} has not yet been added to KafkaAdmin." + "Support for CreateTopics v{} has not yet been added to KafkaAdminClient." .format(version)) # TODO convert structs to a more pythonic interface # TODO raise exceptions if errors @@ -415,7 +415,7 @@ class KafkaAdmin(object): response = self._send_request_to_controller(request) else: raise NotImplementedError( - "Support for DeleteTopics v{} has not yet been added to KafkaAdmin." + "Support for DeleteTopics v{} has not yet been added to KafkaAdminClient." .format(version)) return response @@ -474,7 +474,7 @@ class KafkaAdmin(object): ) else: raise NotImplementedError( - "Support for DescribeConfigs v{} has not yet been added to KafkaAdmin." + "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." .format(version)) return self._send_request_to_node(self._client.least_loaded_node(), request) @@ -507,7 +507,7 @@ class KafkaAdmin(object): ) else: raise NotImplementedError( - "Support for AlterConfigs v{} has not yet been added to KafkaAdmin." + "Support for AlterConfigs v{} has not yet been added to KafkaAdminClient." .format(version)) # TODO the Java client has the note: # // We must make a separate AlterConfigs request for every BROKER resource we want to alter @@ -553,7 +553,7 @@ class KafkaAdmin(object): ) else: raise NotImplementedError( - "Support for CreatePartitions v{} has not yet been added to KafkaAdmin." + "Support for CreatePartitions v{} has not yet been added to KafkaAdminClient." .format(version)) return self._send_request_to_controller(request) @@ -625,7 +625,7 @@ class KafkaAdmin(object): group_descriptions.append(group_description) else: raise NotImplementedError( - "Support for DescribeGroups v{} has not yet been added to KafkaAdmin." + "Support for DescribeGroups v{} has not yet been added to KafkaAdminClient." .format(version)) return group_descriptions @@ -674,7 +674,7 @@ class KafkaAdmin(object): consumer_groups.update(response.groups) else: raise NotImplementedError( - "Support for ListGroups v{} has not yet been added to KafkaAdmin." + "Support for ListGroups v{} has not yet been added to KafkaAdminClient." .format(version)) return list(consumer_groups) @@ -744,7 +744,7 @@ class KafkaAdmin(object): group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata) else: raise NotImplementedError( - "Support for OffsetFetch v{} has not yet been added to KafkaAdmin." + "Support for OffsetFetch v{} has not yet been added to KafkaAdminClient." .format(version)) return group_offsets_listing |