From 2a41fa1fe4cee892604786f460e916dc0d96378f Mon Sep 17 00:00:00 2001 From: Mike Fischer Date: Mon, 24 Apr 2017 13:24:15 +0800 Subject: Deal with brokers that disappear, reappear with different IP address (#1085) When KafkaClient connects to a broker in _maybe_connect, it inserts into self._conns a BrokerConnection configured with the current host/port for that node. The BrokerConnection remains there forever, though, so if the broker's IP or host ever changes, KafkaClient has no way to deal with this. The fix is to compare the latest metadata with the current node's connection, and if the host/IP has changed, decommission the old connection and allow a new one to be created. There's also a common race condition on broker startup where the initial metadata request sometimes returns an empty list of brokers, but subsequent requests behave normally. So, we must deal with broker being None here. This change is conservative in that it doesn't remove the connection from self._conns unless the new broker metadata contains an entry for that same node with a new IP/port. --- kafka/client_async.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index e1b10b3..0b08415 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -324,8 +324,19 @@ class KafkaClient(object): def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" + broker = self.cluster.broker_metadata(node_id) + + # If broker metadata indicates that a node's host/port has changed, remove it + if node_id in self._conns and broker is not None: + conn = self._conns[node_id] + host, _, __ = get_ip_port_afi(broker.host) + if conn.host != host or conn.port != broker.port: + log.debug("Closing connection to decommissioned node %s at %s:%s", + node_id, conn.host, conn.port) + conn.close() + self._conns.pop(node_id) + if node_id not in self._conns: - broker = self.cluster.broker_metadata(node_id) assert broker, 'Broker id %s not in current metadata' % node_id log.debug("Initiating connection to node %s at %s:%s", -- cgit v1.2.1