summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 11:58:58 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 13:47:09 -0700
commit9960f3d8d2902ae0bb57262a6e530ed219168b2c (patch)
treee537d752796e43602eda0daac98aac8622efdb50 /kafka/cluster.py
parent72bcadcaf106668ff275e03a12b9512ee4983547 (diff)
downloadkafka-python-9960f3d8d2902ae0bb57262a6e530ed219168b2c.tar.gz
Add rack to BrokerMetadata - it is always None when using MetadataRequest v0
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py19
1 files changed, 11 insertions, 8 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 9aabec1..c3b8f3c 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -189,7 +189,7 @@ class ClusterMetadata(object):
for node_id, host, port in metadata.brokers:
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port)
+ node_id: BrokerMetadata(node_id, host, port, None)
})
_new_partitions = {}
@@ -272,7 +272,8 @@ class ClusterMetadata(object):
coordinator = BrokerMetadata(
response.coordinator_id,
response.host,
- response.port)
+ response.port,
+ None)
# Assume that group coordinators are just brokers
# (this is true now, but could diverge in future)
@@ -281,12 +282,14 @@ class ClusterMetadata(object):
# If this happens, either brokers have moved without
# changing IDs, or our assumption above is wrong
- elif coordinator != self._brokers[node_id]:
- log.error("GroupCoordinator metadata conflicts with existing"
- " broker metadata. Coordinator: %s, Broker: %s",
- coordinator, self._brokers[node_id])
- self._groups[group] = node_id
- return False
+ else:
+ node = self._brokers[node_id]
+ if coordinator.host != node.host or coordinator.port != node.port:
+ log.error("GroupCoordinator metadata conflicts with existing"
+ " broker metadata. Coordinator: %s, Broker: %s",
+ coordinator, node)
+ self._groups[group] = node_id
+ return False
log.info("Group coordinator for %s is %s", group, coordinator)
self._groups[group] = node_id