diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:30:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-16 12:30:37 -0800 |
commit | d1daeaad2520fceba1651f4d2bd7201a5699f6be (patch) | |
tree | fea5a7f2cba9cb65524e91132ab1f7697ec12f9c /kafka/cluster.py | |
parent | 3df92c907051179227b798b42bfc876dc53e384f (diff) | |
download | kafka-python-d1daeaad2520fceba1651f4d2bd7201a5699f6be.tar.gz |
Improve kafka.cluster docstrings
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 72 |
1 files changed, 65 insertions, 7 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 8c2c10e..9ab6e6e 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -42,19 +42,46 @@ class ClusterMetadata(object): self.config[key] = configs[key] def brokers(self): + """Get all BrokerMetadata + + Returns: + set: {BrokerMetadata, ...} + """ return set(self._brokers.values()) def broker_metadata(self, broker_id): + """Get BrokerMetadata + + Arguments: + broker_id (int): node_id for a broker to check + + Returns: + BrokerMetadata or None if not found + """ return self._brokers.get(broker_id) def partitions_for_topic(self, topic): - """Return set of all partitions for topic (whether available or not)""" + """Return set of all partitions for topic (whether available or not) + + Arguments: + topic (str): topic to check for partitions + + Returns: + set: {partition (int), ...} + """ if topic not in self._partitions: return None return set(self._partitions[topic].keys()) def available_partitions_for_topic(self, topic): - """Return set of partitions with known leaders""" + """Return set of partitions with known leaders + + Arguments: + topic (str): topic to check for partitions + + Returns: + set: {partition (int), ...} + """ if topic not in self._partitions: return None return set([partition for partition, metadata @@ -70,10 +97,25 @@ class ClusterMetadata(object): return self._partitions[partition.topic][partition.partition].leader def partitions_for_broker(self, broker_id): - """Return TopicPartitions for which the broker is a leader""" + """Return TopicPartitions for which the broker is a leader. + + Arguments: + broker_id (int): node id for a broker + + Returns: + set: {TopicPartition, ...} + """ return self._broker_partitions.get(broker_id) def coordinator_for_group(self, group): + """Return node_id of group coordinator. + + Arguments: + group (str): name of consumer group + + Returns: + int: node_id for group coordinator + """ return self._groups.get(group) def ttl(self): @@ -96,7 +138,8 @@ class ClusterMetadata(object): Actual update must be handled separately. This method will only change the reported ttl() - Returns: Future (value will be this cluster object after update) + Returns: + kafka.future.Future (value will be the cluster object after update) """ with self._lock: self._need_update = True @@ -105,9 +148,15 @@ class ClusterMetadata(object): return self._future def topics(self): + """Get set of known topics. + + Returns: + set: {topic (str), ...} + """ return set(self._partitions.keys()) def failed_update(self, exception): + """Update cluster state given a failed MetadataRequest.""" f = None with self._lock: if self._future: @@ -118,6 +167,13 @@ class ClusterMetadata(object): self._last_refresh_ms = time.time() * 1000 def update_metadata(self, metadata): + """Update cluster state given a MetadataResponse. + + Arguments: + metadata (MetadataResponse): broker response to a metadata request + + Returns: None + """ # In the common case where we ask for a single topic and get back an # error, we should fail the future if len(metadata.topics) == 1 and metadata.topics[0][0] != 0: @@ -195,10 +251,12 @@ class ClusterMetadata(object): def add_group_coordinator(self, group, response): """Update with metadata for a group coordinator - group: name of group from GroupCoordinatorRequest - response: GroupCoordinatorResponse + Arguments: + group (str): name of group from GroupCoordinatorRequest + response (GroupCoordinatorResponse): broker response - returns True if metadata is updated, False on error + Returns: + bool: True if metadata is updated, False on error """ log.debug("Updating coordinator for %s: %s", group, response) error_type = Errors.for_code(response.error_code) |