summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py149
1 files changed, 66 insertions, 83 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 50b481e..fdf5454 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -55,7 +55,7 @@ class KafkaClient(object):
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
+ strings) that the client should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
@@ -222,76 +222,34 @@ class KafkaClient(object):
self.config['metric_group_prefix'],
weakref.proxy(self._conns))
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._num_bootstrap_hosts = len(collect_hosts(self.config['bootstrap_servers']))
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
- def _bootstrap(self, hosts):
- log.info('Bootstrapping cluster metadata from %s', hosts)
- # Exponential backoff if bootstrap fails
- backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
+ def _can_bootstrap(self):
+ effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
+ backoff_factor = 2 ** effective_failures
+ backoff_ms = min(self.config['reconnect_backoff_ms'] * backoff_factor,
+ self.config['reconnect_backoff_max_ms'])
+
+ backoff_ms *= random.uniform(0.8, 1.2)
+
next_at = self._last_bootstrap + backoff_ms / 1000.0
- self._refresh_on_disconnects = False
now = time.time()
if next_at > now:
- log.debug("Sleeping %0.4f before bootstrapping again", next_at - now)
- time.sleep(next_at - now)
- self._last_bootstrap = time.time()
-
- if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
- if self.config['bootstrap_topics_filter']:
- metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
- else:
- metadata_request = MetadataRequest[0]([])
- else:
- if self.config['bootstrap_topics_filter']:
- metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
- else:
- metadata_request = MetadataRequest[1](None)
-
- for host, port, afi in hosts:
- log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
- bootstrap = BrokerConnection(host, port, afi,
- state_change_callback=cb,
- node_id='bootstrap',
- **self.config)
- if not bootstrap.connect_blocking():
- bootstrap.close()
- continue
- future = bootstrap.send(metadata_request)
- while not future.is_done:
- self._selector.select(1)
- for r, f in bootstrap.recv():
- f.success(r)
- if future.failed():
- bootstrap.close()
- continue
- self.cluster.update_metadata(future.value)
- log.info('Bootstrap succeeded: found %d brokers and %d topics.',
- len(self.cluster.brokers()), len(self.cluster.topics()))
-
- # A cluster with no topics can return no broker metadata
- # in that case, we should keep the bootstrap connection
- if not len(self.cluster.brokers()):
- self._conns['bootstrap'] = bootstrap
- else:
- bootstrap.close()
- self._bootstrap_fails = 0
- break
- # No bootstrap found...
- else:
- log.error('Unable to bootstrap from %s', hosts)
- # Max exponential backoff is 2^12, x4000 (50ms -> 200s)
- self._bootstrap_fails = min(self._bootstrap_fails + 1, 12)
- self._refresh_on_disconnects = True
+ return False
+ return True
def _can_connect(self, node_id):
if node_id not in self._conns:
- if self.cluster.broker_metadata(node_id):
+ # cluster.broker_metadata() is stateful when called w/ 'bootstrap'
+ # (it cycles through all of the bootstrap servers)
+ # so we short-circuit here and assume that we should always have
+ # some bootstrap_servers config to power bootstrap broker_metadata
+ if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
@@ -308,6 +266,9 @@ class KafkaClient(object):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
+ if node_id == 'bootstrap':
+ self._last_bootstrap = time.time()
+
elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
@@ -323,12 +284,12 @@ class KafkaClient(object):
self._idle_expiry_manager.update(node_id)
- if 'bootstrap' in self._conns and node_id != 'bootstrap':
+ if node_id == 'bootstrap':
+ self._bootstrap_fails = 0
+
+ elif 'bootstrap' in self._conns:
bootstrap = self._conns.pop('bootstrap')
- # XXX: make conn.close() require error to cause refresh
- self._refresh_on_disconnects = False
bootstrap.close()
- self._refresh_on_disconnects = True
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -347,7 +308,10 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
+ if node_id == 'bootstrap':
+ self._bootstrap_fails += 1
+
+ elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -362,13 +326,40 @@ class KafkaClient(object):
return True
return False
+ def _should_recycle_connection(self, conn):
+ # Never recycle unless disconnected
+ if not conn.disconnected():
+ return False
+
+ # Always recycled disconnected bootstraps
+ elif conn.node_id == 'bootstrap':
+ return True
+
+ # Otherwise, only recycle when broker metadata has changed
+ broker = self.cluster.broker_metadata(conn.node_id)
+ if broker is None:
+ return False
+
+ host, _, afi = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.info("Broker metadata change detected for node %s"
+ " from %s:%s to %s:%s", conn.node_id, conn.host, conn.port,
+ broker.host, broker.port)
+ return True
+
+ return False
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
- broker = self.cluster.broker_metadata(node_id)
conn = self._conns.get(node_id)
if conn is None:
+ # Note that when bootstrapping, each call to broker_metadata may
+ # return a different host/port. So we need to be careful to only
+ # call when necessary to avoid skipping some possible bootstrap
+ # source.
+ broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
log.debug("Initiating connection to node %s at %s:%s",
@@ -382,17 +373,9 @@ class KafkaClient(object):
self._conns[node_id] = conn
# Check if existing connection should be recreated because host/port changed
- elif conn.disconnected() and broker is not None:
- host, _, __ = get_ip_port_afi(broker.host)
- if conn.host != host or conn.port != broker.port:
- log.info("Broker metadata change detected for node %s"
- " from %s:%s to %s:%s", node_id, conn.host, conn.port,
- broker.host, broker.port)
-
- # Drop old connection object.
- # It will be recreated on next _maybe_connect
- self._conns.pop(node_id)
- return False
+ elif self._should_recycle_connection(conn):
+ self._conns.pop(node_id)
+ return False
elif conn.connected():
return True
@@ -713,7 +696,8 @@ class KafkaClient(object):
This method will prefer a node with an existing connection and no
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
- are not subject to a reconnect backoff).
+ are not subject to a reconnect backoff). If no node metadata has been
+ obtained, will return 'bootstrap' (subject to exponential backoff).
Returns:
node_id or None if no suitable node was found
@@ -740,12 +724,8 @@ class KafkaClient(object):
if found is not None:
return found
- # some broker versions return an empty list of broker metadata
- # if there are no topics created yet. the bootstrap process
- # should detect this and keep a 'bootstrap' node alive until
- # a non-bootstrap node is connected and non-empty broker
- # metadata is available
- elif 'bootstrap' in self._conns:
+ elif not nodes and self._can_bootstrap():
+ self._last_bootstrap = time.time()
return 'bootstrap'
return None
@@ -805,6 +785,9 @@ class KafkaClient(object):
if self._can_send_request(node_id):
topics = list(self._topics)
+ if not topics and node_id == 'bootstrap':
+ topics = list(self.config['bootstrap_topics_filter'])
+
if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1