summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Fischer <mike@originstech.com>2017-04-24 13:24:15 +0800
committerDana Powers <dana.powers@gmail.com>2017-06-18 23:17:31 -0700
commit2a41fa1fe4cee892604786f460e916dc0d96378f (patch)
treebc37ae275b6506ec412fe22abcec34c2bd3a115f
parent26a810220acbca57200a805132c5f32108a7fc9c (diff)
downloadkafka-python-2a41fa1fe4cee892604786f460e916dc0d96378f.tar.gz
Deal with brokers that disappear, reappear with different IP address (#1085)
When KafkaClient connects to a broker in _maybe_connect, it inserts into self._conns a BrokerConnection configured with the current host/port for that node. The BrokerConnection remains there forever, though, so if the broker's IP or host ever changes, KafkaClient has no way to deal with this. The fix is to compare the latest metadata with the current node's connection, and if the host/IP has changed, decommission the old connection and allow a new one to be created. There's also a common race condition on broker startup where the initial metadata request sometimes returns an empty list of brokers, but subsequent requests behave normally. So, we must deal with broker being None here. This change is conservative in that it doesn't remove the connection from self._conns unless the new broker metadata contains an entry for that same node with a new IP/port.
-rw-r--r--kafka/client_async.py13
1 files changed, 12 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e1b10b3..0b08415 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -324,8 +324,19 @@ class KafkaClient(object):
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
+ broker = self.cluster.broker_metadata(node_id)
+
+ # If broker metadata indicates that a node's host/port has changed, remove it
+ if node_id in self._conns and broker is not None:
+ conn = self._conns[node_id]
+ host, _, __ = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.debug("Closing connection to decommissioned node %s at %s:%s",
+ node_id, conn.host, conn.port)
+ conn.close()
+ self._conns.pop(node_id)
+
if node_id not in self._conns:
- broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % node_id
log.debug("Initiating connection to node %s at %s:%s",