diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-18 23:18:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-18 23:18:42 -0700 |
commit | 0b5a49e58d16336c1a632a4f5e42bc4fbbb3d118 (patch) | |
tree | da91bcd96f354847b234bd04ee59b3e738ce10c7 | |
parent | 6271c02c6eebf52a6d368416db49bfa57b09ef04 (diff) | |
download | kafka-python-0b5a49e58d16336c1a632a4f5e42bc4fbbb3d118.tar.gz |
Update KafkaClient.least_loaded_node (#730)
- Main node loop should check all known brokers, not just conn objects,
which is consistent with the official java client.
- This fixes a bug which could cause least_loaded_node to always
return the same unavailable node
-rw-r--r-- | kafka/client_async.py | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 93094e2..1276743 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -525,24 +525,21 @@ class KafkaClient(object): Returns: node_id or None if no suitable node was found """ - nodes = list(self._conns.keys()) + nodes = [broker.nodeId for broker in self.cluster.brokers()] random.shuffle(nodes) - # If there's a lingering bootstrap node, always try it last - # really we should just kill this connection - if 'bootstrap' in nodes: - nodes.remove('bootstrap') - nodes.append('bootstrap') - inflight = float('inf') found = None for node_id in nodes: - conn = self._conns[node_id] - curr_inflight = len(conn.in_flight_requests) - if curr_inflight == 0 and conn.connected(): - # if we find an established connection with no in-flight requests we can stop right away + conn = self._conns.get(node_id) + connected = conn is not None and conn.connected() + blacked_out = conn is not None and conn.blacked_out() + curr_inflight = len(conn.in_flight_requests) if conn else 0 + if connected and curr_inflight == 0: + # if we find an established connection + # with no in-flight requests, we can stop right away return node_id - elif not conn.blacked_out() and curr_inflight < inflight: + elif not blacked_out and curr_inflight < inflight: # otherwise if this is the best we have found so far, record that inflight = curr_inflight found = node_id @@ -550,19 +547,16 @@ class KafkaClient(object): if found is not None: return found - # if we found no connected node, return a disconnected one - log.debug("No connected nodes found. Trying disconnected nodes.") - for node_id in nodes: - if not self._conns[node_id].blacked_out(): - return node_id - - # if still no luck, look for a node not in self._conns yet - log.debug("No luck. Trying all broker metadata") - for broker in self.cluster.brokers(): - if broker.nodeId not in self._conns: - return broker.nodeId + # some broker versions return an empty list of broker metadata + # if there are no topics created yet. the bootstrap process + # should detect this and keep a 'bootstrap' node alive until + # a non-bootstrap node is connected and non-empty broker + # metadata is available + elif 'bootstrap' in self._conns: + return 'bootstrap' # Last option: try to bootstrap again + # this should only happen if no prior bootstrap has been successful log.error('No nodes found in metadata -- retrying bootstrap') self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) return None |