summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-06-18 23:18:41 -0700
committerDana Powers <dana.powers@gmail.com>2017-06-18 23:18:41 -0700
commitbbbac3dc3678df069ef72ecfea62d435bc519a07 (patch)
tree94f7e3afab1b57b9c0e94da11fbe4e70ca313a08
parent2a41fa1fe4cee892604786f460e916dc0d96378f (diff)
downloadkafka-python-bbbac3dc3678df069ef72ecfea62d435bc519a07.tar.gz
Fixup for #1085 -- only check for changed metadata on disconnected nodes
-rw-r--r--kafka/client_async.py40
1 files changed, 23 insertions, 17 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 0b08415..d8c2389 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -325,31 +325,37 @@ class KafkaClient(object):
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
broker = self.cluster.broker_metadata(node_id)
+ conn = self._conns.get(node_id)
- # If broker metadata indicates that a node's host/port has changed, remove it
- if node_id in self._conns and broker is not None:
- conn = self._conns[node_id]
- host, _, __ = get_ip_port_afi(broker.host)
- if conn.host != host or conn.port != broker.port:
- log.debug("Closing connection to decommissioned node %s at %s:%s",
- node_id, conn.host, conn.port)
- conn.close()
- self._conns.pop(node_id)
-
- if node_id not in self._conns:
+ if conn is None:
assert broker, 'Broker id %s not in current metadata' % node_id
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
cb = functools.partial(self._conn_state_change, node_id)
- self._conns[node_id] = BrokerConnection(host, broker.port, afi,
- state_change_callback=cb,
- node_id=node_id,
- **self.config)
- conn = self._conns[node_id]
- if conn.connected():
+ conn = BrokerConnection(host, broker.port, afi,
+ state_change_callback=cb,
+ node_id=node_id,
+ **self.config)
+ self._conns[node_id] = conn
+
+ # Check if existing connection should be recreated because host/port changed
+ elif conn.disconnected() and broker is not None:
+ host, _, __ = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.info("Broker metadata change detected for node %s"
+ " from %s:%s to %s:%s", node_id, conn.host, conn.port,
+ broker.host, broker.port)
+
+ # Drop old connection object.
+ # It will be recreated on next _maybe_connect
+ self._conns.pop(node_id)
+ return False
+
+ elif conn.connected():
return True
+
conn.connect()
return conn.connected()