From 40e2faa9abfb0c2bbf0c54065b1d5e2298b1f7b5 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 1 Aug 2016 18:14:59 -0700 Subject: Metadata with_partitions() (#787) add method ClusterMetadata.with_partitions also fixup ClusterMetadata __str__ --- kafka/cluster.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) (limited to 'kafka/cluster.py') diff --git a/kafka/cluster.py b/kafka/cluster.py index b7c0135..4646378 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -326,6 +326,25 @@ class ClusterMetadata(object): self._groups[group] = node_id return True + def with_partitions(self, partitions_to_add): + """Returns a copy of cluster metadata with partitions added""" + new_metadata = ClusterMetadata(**self.config) + new_metadata._brokers = copy.deepcopy(self._brokers) + new_metadata._partitions = copy.deepcopy(self._partitions) + new_metadata._broker_partitions = copy.deepcopy(self._broker_partitions) + new_metadata._groups = copy.deepcopy(self._groups) + new_metadata.internal_topics = copy.deepcopy(self.internal_topics) + new_metadata.unauthorized_topics = copy.deepcopy(self.unauthorized_topics) + + for partition in partitions_to_add: + new_metadata._partitions[partition.topic][partition.partition] = partition + + if partition.leader is not None and partition.leader != -1: + new_metadata._broker_partitions[partition.leader].add( + TopicPartition(partition.topic, partition.partition)) + + return new_metadata + def __str__(self): - return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \ + return 'ClusterMetadata(brokers: %d, topics: %d, groups: %d)' % \ (len(self._brokers), len(self._partitions), len(self._groups)) -- cgit v1.2.1