summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-16 12:30:37 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-16 12:30:37 -0800
commitd1daeaad2520fceba1651f4d2bd7201a5699f6be (patch)
treefea5a7f2cba9cb65524e91132ab1f7697ec12f9c /kafka/cluster.py
parent3df92c907051179227b798b42bfc876dc53e384f (diff)
downloadkafka-python-d1daeaad2520fceba1651f4d2bd7201a5699f6be.tar.gz
Improve kafka.cluster docstrings
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py72
1 files changed, 65 insertions, 7 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 8c2c10e..9ab6e6e 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -42,19 +42,46 @@ class ClusterMetadata(object):
self.config[key] = configs[key]
def brokers(self):
+ """Get all BrokerMetadata
+
+ Returns:
+ set: {BrokerMetadata, ...}
+ """
return set(self._brokers.values())
def broker_metadata(self, broker_id):
+ """Get BrokerMetadata
+
+ Arguments:
+ broker_id (int): node_id for a broker to check
+
+ Returns:
+ BrokerMetadata or None if not found
+ """
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):
- """Return set of all partitions for topic (whether available or not)"""
+ """Return set of all partitions for topic (whether available or not)
+
+ Arguments:
+ topic (str): topic to check for partitions
+
+ Returns:
+ set: {partition (int), ...}
+ """
if topic not in self._partitions:
return None
return set(self._partitions[topic].keys())
def available_partitions_for_topic(self, topic):
- """Return set of partitions with known leaders"""
+ """Return set of partitions with known leaders
+
+ Arguments:
+ topic (str): topic to check for partitions
+
+ Returns:
+ set: {partition (int), ...}
+ """
if topic not in self._partitions:
return None
return set([partition for partition, metadata
@@ -70,10 +97,25 @@ class ClusterMetadata(object):
return self._partitions[partition.topic][partition.partition].leader
def partitions_for_broker(self, broker_id):
- """Return TopicPartitions for which the broker is a leader"""
+ """Return TopicPartitions for which the broker is a leader.
+
+ Arguments:
+ broker_id (int): node id for a broker
+
+ Returns:
+ set: {TopicPartition, ...}
+ """
return self._broker_partitions.get(broker_id)
def coordinator_for_group(self, group):
+ """Return node_id of group coordinator.
+
+ Arguments:
+ group (str): name of consumer group
+
+ Returns:
+ int: node_id for group coordinator
+ """
return self._groups.get(group)
def ttl(self):
@@ -96,7 +138,8 @@ class ClusterMetadata(object):
Actual update must be handled separately. This method will only
change the reported ttl()
- Returns: Future (value will be this cluster object after update)
+ Returns:
+ kafka.future.Future (value will be the cluster object after update)
"""
with self._lock:
self._need_update = True
@@ -105,9 +148,15 @@ class ClusterMetadata(object):
return self._future
def topics(self):
+ """Get set of known topics.
+
+ Returns:
+ set: {topic (str), ...}
+ """
return set(self._partitions.keys())
def failed_update(self, exception):
+ """Update cluster state given a failed MetadataRequest."""
f = None
with self._lock:
if self._future:
@@ -118,6 +167,13 @@ class ClusterMetadata(object):
self._last_refresh_ms = time.time() * 1000
def update_metadata(self, metadata):
+ """Update cluster state given a MetadataResponse.
+
+ Arguments:
+ metadata (MetadataResponse): broker response to a metadata request
+
+ Returns: None
+ """
# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
@@ -195,10 +251,12 @@ class ClusterMetadata(object):
def add_group_coordinator(self, group, response):
"""Update with metadata for a group coordinator
- group: name of group from GroupCoordinatorRequest
- response: GroupCoordinatorResponse
+ Arguments:
+ group (str): name of group from GroupCoordinatorRequest
+ response (GroupCoordinatorResponse): broker response
- returns True if metadata is updated, False on error
+ Returns:
+ bool: True if metadata is updated, False on error
"""
log.debug("Updating coordinator for %s: %s", group, response)
error_type = Errors.for_code(response.error_code)