summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-08-01 18:14:59 -0700
committerGitHub <noreply@github.com>2016-08-01 18:14:59 -0700
commit40e2faa9abfb0c2bbf0c54065b1d5e2298b1f7b5 (patch)
treeaec4c2c76ddab4690397e54b6e71e5b4c6ff006a
parent99891adfa0b4e6baa0877c50e7fb2d3951ec5b39 (diff)
downloadkafka-python-40e2faa9abfb0c2bbf0c54065b1d5e2298b1f7b5.tar.gz
Metadata with_partitions() (#787)
add method ClusterMetadata.with_partitions also fixup ClusterMetadata __str__
-rw-r--r--kafka/cluster.py21
1 files changed, 20 insertions, 1 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index b7c0135..4646378 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -326,6 +326,25 @@ class ClusterMetadata(object):
self._groups[group] = node_id
return True
+ def with_partitions(self, partitions_to_add):
+ """Returns a copy of cluster metadata with partitions added"""
+ new_metadata = ClusterMetadata(**self.config)
+ new_metadata._brokers = copy.deepcopy(self._brokers)
+ new_metadata._partitions = copy.deepcopy(self._partitions)
+ new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions)
+ new_metadata._groups = copy.deepcopy(self._groups)
+ new_metadata.internal_topics = copy.deepcopy(self.internal_topics)
+ new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics)
+
+ for partition in partitions_to_add:
+ new_metadata._partitions[partition.topic][partition.partition] = partition
+
+ if partition.leader is not None and partition.leader != -1:
+ new_metadata._broker_partitions[partition.leader].add(
+ TopicPartition(partition.topic, partition.partition))
+
+ return new_metadata
+
def __str__(self):
- return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \
+ return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \
(len(self._brokers), len(self._partitions), len(self._groups))