summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-14 09:39:28 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-03-14 09:39:28 -0700
commit812de351f75beefe73bd9bef55847ab61ccc951d (patch)
treec0122d099cbe0ff0c2c3a1adf76dc09493ae8bcf
parent703f06590be2daa7e4592b3d82df6d719a6829bb (diff)
downloadkafka-python-812de351f75beefe73bd9bef55847ab61ccc951d.tar.gz
Retry bootstrapping after backoff when necessary (#1736)
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution. This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer. Fix #1670
-rw-r--r--kafka/client_async.py149
-rw-r--r--kafka/cluster.py24
-rw-r--r--test/test_client_async.py68
3 files changed, 112 insertions, 129 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 50b481e..fdf5454 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -55,7 +55,7 @@ class KafkaClient(object):
Keyword Arguments:
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
- strings) that the consumer should contact to bootstrap initial
+ strings) that the client should contact to bootstrap initial
cluster metadata. This does not have to be the full node list.
It just needs to have at least one broker that will respond to a
Metadata API Request. Default port is 9092. If no servers are
@@ -222,76 +222,34 @@ class KafkaClient(object):
self.config['metric_group_prefix'],
weakref.proxy(self._conns))
- self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
+ self._num_bootstrap_hosts = len(collect_hosts(self.config['bootstrap_servers']))
# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
- def _bootstrap(self, hosts):
- log.info('Bootstrapping cluster metadata from %s', hosts)
- # Exponential backoff if bootstrap fails
- backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
+ def _can_bootstrap(self):
+ effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts
+ backoff_factor = 2 ** effective_failures
+ backoff_ms = min(self.config['reconnect_backoff_ms'] * backoff_factor,
+ self.config['reconnect_backoff_max_ms'])
+
+ backoff_ms *= random.uniform(0.8, 1.2)
+
next_at = self._last_bootstrap + backoff_ms / 1000.0
- self._refresh_on_disconnects = False
now = time.time()
if next_at > now:
- log.debug("Sleeping %0.4f before bootstrapping again", next_at - now)
- time.sleep(next_at - now)
- self._last_bootstrap = time.time()
-
- if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
- if self.config['bootstrap_topics_filter']:
- metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
- else:
- metadata_request = MetadataRequest[0]([])
- else:
- if self.config['bootstrap_topics_filter']:
- metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
- else:
- metadata_request = MetadataRequest[1](None)
-
- for host, port, afi in hosts:
- log.debug("Attempting to bootstrap via node at %s:%s", host, port)
- cb = functools.partial(WeakMethod(self._conn_state_change), 'bootstrap')
- bootstrap = BrokerConnection(host, port, afi,
- state_change_callback=cb,
- node_id='bootstrap',
- **self.config)
- if not bootstrap.connect_blocking():
- bootstrap.close()
- continue
- future = bootstrap.send(metadata_request)
- while not future.is_done:
- self._selector.select(1)
- for r, f in bootstrap.recv():
- f.success(r)
- if future.failed():
- bootstrap.close()
- continue
- self.cluster.update_metadata(future.value)
- log.info('Bootstrap succeeded: found %d brokers and %d topics.',
- len(self.cluster.brokers()), len(self.cluster.topics()))
-
- # A cluster with no topics can return no broker metadata
- # in that case, we should keep the bootstrap connection
- if not len(self.cluster.brokers()):
- self._conns['bootstrap'] = bootstrap
- else:
- bootstrap.close()
- self._bootstrap_fails = 0
- break
- # No bootstrap found...
- else:
- log.error('Unable to bootstrap from %s', hosts)
- # Max exponential backoff is 2^12, x4000 (50ms -> 200s)
- self._bootstrap_fails = min(self._bootstrap_fails + 1, 12)
- self._refresh_on_disconnects = True
+ return False
+ return True
def _can_connect(self, node_id):
if node_id not in self._conns:
- if self.cluster.broker_metadata(node_id):
+ # cluster.broker_metadata() is stateful when called w/ 'bootstrap'
+ # (it cycles through all of the bootstrap servers)
+ # so we short-circuit here and assume that we should always have
+ # some bootstrap_servers config to power bootstrap broker_metadata
+ if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
@@ -308,6 +266,9 @@ class KafkaClient(object):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
+ if node_id == 'bootstrap':
+ self._last_bootstrap = time.time()
+
elif conn.connected():
log.debug("Node %s connected", node_id)
if node_id in self._connecting:
@@ -323,12 +284,12 @@ class KafkaClient(object):
self._idle_expiry_manager.update(node_id)
- if 'bootstrap' in self._conns and node_id != 'bootstrap':
+ if node_id == 'bootstrap':
+ self._bootstrap_fails = 0
+
+ elif 'bootstrap' in self._conns:
bootstrap = self._conns.pop('bootstrap')
- # XXX: make conn.close() require error to cause refresh
- self._refresh_on_disconnects = False
bootstrap.close()
- self._refresh_on_disconnects = True
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -347,7 +308,10 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
+ if node_id == 'bootstrap':
+ self._bootstrap_fails += 1
+
+ elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
@@ -362,13 +326,40 @@ class KafkaClient(object):
return True
return False
+ def _should_recycle_connection(self, conn):
+ # Never recycle unless disconnected
+ if not conn.disconnected():
+ return False
+
+ # Always recycled disconnected bootstraps
+ elif conn.node_id == 'bootstrap':
+ return True
+
+ # Otherwise, only recycle when broker metadata has changed
+ broker = self.cluster.broker_metadata(conn.node_id)
+ if broker is None:
+ return False
+
+ host, _, afi = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.info("Broker metadata change detected for node %s"
+ " from %s:%s to %s:%s", conn.node_id, conn.host, conn.port,
+ broker.host, broker.port)
+ return True
+
+ return False
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
- broker = self.cluster.broker_metadata(node_id)
conn = self._conns.get(node_id)
if conn is None:
+ # Note that when bootstrapping, each call to broker_metadata may
+ # return a different host/port. So we need to be careful to only
+ # call when necessary to avoid skipping some possible bootstrap
+ # source.
+ broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
log.debug("Initiating connection to node %s at %s:%s",
@@ -382,17 +373,9 @@ class KafkaClient(object):
self._conns[node_id] = conn
# Check if existing connection should be recreated because host/port changed
- elif conn.disconnected() and broker is not None:
- host, _, __ = get_ip_port_afi(broker.host)
- if conn.host != host or conn.port != broker.port:
- log.info("Broker metadata change detected for node %s"
- " from %s:%s to %s:%s", node_id, conn.host, conn.port,
- broker.host, broker.port)
-
- # Drop old connection object.
- # It will be recreated on next _maybe_connect
- self._conns.pop(node_id)
- return False
+ elif self._should_recycle_connection(conn):
+ self._conns.pop(node_id)
+ return False
elif conn.connected():
return True
@@ -713,7 +696,8 @@ class KafkaClient(object):
This method will prefer a node with an existing connection and no
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
- are not subject to a reconnect backoff).
+ are not subject to a reconnect backoff). If no node metadata has been
+ obtained, will return 'bootstrap' (subject to exponential backoff).
Returns:
node_id or None if no suitable node was found
@@ -740,12 +724,8 @@ class KafkaClient(object):
if found is not None:
return found
- # some broker versions return an empty list of broker metadata
- # if there are no topics created yet. the bootstrap process
- # should detect this and keep a 'bootstrap' node alive until
- # a non-bootstrap node is connected and non-empty broker
- # metadata is available
- elif 'bootstrap' in self._conns:
+ elif not nodes and self._can_bootstrap():
+ self._last_bootstrap = time.time()
return 'bootstrap'
return None
@@ -805,6 +785,9 @@ class KafkaClient(object):
if self._can_send_request(node_id):
topics = list(self._topics)
+ if not topics and node_id == 'bootstrap':
+ topics = list(self.config['bootstrap_topics_filter'])
+
if self.cluster.need_all_topic_metadata or not topics:
topics = [] if self.config['api_version'] < (0, 10) else None
api_version = 0 if self.config['api_version'] < (0, 10) else 1
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 8078eb7..3d57ed2 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,6 +9,7 @@ import time
from kafka.vendor import six
from kafka import errors as Errors
+from kafka.conn import collect_hosts, dns_lookup
from kafka.future import Future
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition
@@ -29,10 +30,17 @@ class ClusterMetadata(object):
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the client should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
"""
DEFAULT_CONFIG = {
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'bootstrap_servers': 'localhost',
}
def __init__(self, **configs):
@@ -42,7 +50,7 @@ class ClusterMetadata(object):
self._groups = {} # group_name -> node_id
self._last_refresh_ms = 0
self._last_successful_refresh_ms = 0
- self._need_update = False
+ self._need_update = True
self._future = None
self._listeners = set()
self._lock = threading.Lock()
@@ -56,6 +64,17 @@ class ClusterMetadata(object):
if key in configs:
self.config[key] = configs[key]
+ self._bootstrap_brokers = self._generate_bootstrap_brokers()
+
+ def _generate_bootstrap_brokers(self):
+ # collect_hosts does not perform DNS, so we should be fine to re-use
+ bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
+
+ while True:
+ for host, port, afi in bootstrap_hosts:
+ for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
+ yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
+
def brokers(self):
"""Get all BrokerMetadata
@@ -73,6 +92,9 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
+ if broker_id == 'bootstrap':
+ return next(self._bootstrap_brokers)
+
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):
diff --git a/test/test_client_async.py b/test/test_client_async.py
index a4dc9db..3588423 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -23,58 +23,34 @@ from kafka.structs import BrokerMetadata
@pytest.fixture
-def cli(conn):
- return KafkaClient(api_version=(0, 9))
-
-
-@pytest.mark.parametrize("bootstrap,expected_hosts", [
- (None, [('localhost', 9092, socket.AF_UNSPEC)]),
- ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]),
- ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_UNSPEC)]),
- ('foo:12,bar:34', [('foo', 12, socket.AF_UNSPEC), ('bar', 34, socket.AF_UNSPEC)]),
- (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_UNSPEC), ('buzz', 9092, socket.AF_UNSPEC)]),
-])
-def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
- mocker.patch.object(KafkaClient, '_bootstrap')
- if bootstrap is None:
- KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks
- else:
- KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9))
-
- # host order is randomized internally, so resort before testing
- (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member
- assert sorted(hosts) == sorted(expected_hosts)
+def cli(mocker, conn):
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
+ client = KafkaClient(api_version=(0, 9))
+ client.poll(future=client.cluster.request_update())
+ return client
-def test_bootstrap_success(conn):
+def test_bootstrap(mocker, conn):
conn.state = ConnectionStates.CONNECTED
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
cli = KafkaClient(api_version=(0, 9))
+ future = cli.cluster.request_update()
+ cli.poll(future=future)
+
+ assert future.succeeded()
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.send.assert_called_once_with(MetadataRequest[0]([]))
+ conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False)
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])
-def test_bootstrap_failure(conn):
- conn.connect_blocking.return_value = False
- cli = KafkaClient(api_version=(0, 9))
- args, kwargs = conn.call_args
- assert args == ('localhost', 9092, socket.AF_UNSPEC)
- kwargs.pop('state_change_callback')
- kwargs.pop('node_id')
- assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.close.assert_called_with()
- assert cli._bootstrap_fails == 1
- assert cli.cluster.brokers() == set()
-
-
def test_can_connect(cli, conn):
# Node is not in broker metadata - can't connect
assert not cli._can_connect(2)
@@ -187,22 +163,26 @@ def test_is_ready(mocker, cli, conn):
def test_close(mocker, cli, conn):
mocker.patch.object(cli, '_selector')
- # bootstrap connection should have been closed
- assert conn.close.call_count == 1
+ call_count = conn.close.call_count
# Unknown node - silent
cli.close(2)
+ call_count += 0
+ assert conn.close.call_count == call_count
# Single node close
cli._maybe_connect(0)
- assert conn.close.call_count == 1
+ assert conn.close.call_count == call_count
cli.close(0)
- assert conn.close.call_count == 2
+ call_count += 1
+ assert conn.close.call_count == call_count
# All node close
cli._maybe_connect(1)
cli.close()
- assert conn.close.call_count == 4
+ # +3 close: node 0, node 1, node bootstrap
+ call_count += 3
+ assert conn.close.call_count == call_count
def test_is_disconnected(cli, conn):
@@ -249,7 +229,6 @@ def test_send(cli, conn):
def test_poll(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(api_version=(0, 9))
@@ -309,7 +288,6 @@ def test_set_topics(mocker):
@pytest.fixture
def client(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(request_timeout_ms=9999999,