summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-18 16:32:01 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-18 16:32:01 -0700
commit66f3ce81a23ab19f647a56f4845a0a6c4cb86c52 (patch)
tree27deffbd96be37417eed97f56f41db186cc22e15
parent81860eeea1449678fb2d42082e08d1bc40cf1f30 (diff)
downloadkafka-python-least_loaded_node.tar.gz
Update KafkaClient.least_loaded_nodeleast_loaded_node
- Main node loop should check all known brokers, not just conn objects, which is consistent with the official java client. - This fixes a bug which could cause least_loaded_node to always return the same unavailable node
-rw-r--r--kafka/client_async.py40
1 files changed, 17 insertions, 23 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 62b0095..8ce436e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -524,24 +524,21 @@ class KafkaClient(object):
Returns:
node_id or None if no suitable node was found
"""
- nodes = list(self._conns.keys())
+ nodes = [broker.nodeId for broker in self.cluster.brokers()]
random.shuffle(nodes)
- # If there's a lingering bootstrap node, always try it last
- # really we should just kill this connection
- if 'bootstrap' in nodes:
- nodes.remove('bootstrap')
- nodes.append('bootstrap')
-
inflight = float('inf')
found = None
for node_id in nodes:
- conn = self._conns[node_id]
- curr_inflight = len(conn.in_flight_requests)
- if curr_inflight == 0 and conn.connected():
- # if we find an established connection with no in-flight requests we can stop right away
+ conn = self._conns.get(node_id)
+ connected = conn is not None and conn.connected()
+ blacked_out = conn is not None and conn.blacked_out()
+ curr_inflight = len(conn.in_flight_requests) if conn else 0
+ if connected and curr_inflight == 0:
+ # if we find an established connection
+ # with no in-flight requests, we can stop right away
return node_id
- elif not conn.blacked_out() and curr_inflight < inflight:
+ elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id
@@ -549,19 +546,16 @@ class KafkaClient(object):
if found is not None:
return found
- # if we found no connected node, return a disconnected one
- log.debug("No connected nodes found. Trying disconnected nodes.")
- for node_id in nodes:
- if not self._conns[node_id].blacked_out():
- return node_id
-
- # if still no luck, look for a node not in self._conns yet
- log.debug("No luck. Trying all broker metadata")
- for broker in self.cluster.brokers():
- if broker.nodeId not in self._conns:
- return broker.nodeId
+ # some broker versions return an empty list of broker metadata
+ # if there are no topics created yet. the bootstrap process
+ # should detect this and keep a 'bootstrap' node alive until
+ # a non-bootstrap node is connected and non-empty broker
+ # metadata is available
+ elif 'bootstrap' in self._conns:
+ return 'bootstrap'
# Last option: try to bootstrap again
+ # this should only happen if no prior bootstrap has been successful
log.error('No nodes found in metadata -- retrying bootstrap')
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
return None