summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-14 09:39:28 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-03-14 09:39:28 -0700
commit812de351f75beefe73bd9bef55847ab61ccc951d (patch)
treec0122d099cbe0ff0c2c3a1adf76dc09493ae8bcf /test
parent703f06590be2daa7e4592b3d82df6d719a6829bb (diff)
downloadkafka-python-812de351f75beefe73bd9bef55847ab61ccc951d.tar.gz
Retry bootstrapping after backoff when necessary (#1736)
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution. This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer. Fix #1670
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py68
1 files changed, 23 insertions, 45 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index a4dc9db..3588423 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -23,58 +23,34 @@ from kafka.structs import BrokerMetadata
@pytest.fixture
-def cli(conn):
- return KafkaClient(api_version=(0, 9))
-
-
-@pytest.mark.parametrize("bootstrap,expected_hosts", [
- (None, [('localhost', 9092, socket.AF_UNSPEC)]),
- ('foobar:1234', [('foobar', 1234, socket.AF_UNSPEC)]),
- ('fizzbuzz', [('fizzbuzz', 9092, socket.AF_UNSPEC)]),
- ('foo:12,bar:34', [('foo', 12, socket.AF_UNSPEC), ('bar', 34, socket.AF_UNSPEC)]),
- (['fizz:56', 'buzz'], [('fizz', 56, socket.AF_UNSPEC), ('buzz', 9092, socket.AF_UNSPEC)]),
-])
-def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
- mocker.patch.object(KafkaClient, '_bootstrap')
- if bootstrap is None:
- KafkaClient(api_version=(0, 9)) # pass api_version to skip auto version checks
- else:
- KafkaClient(bootstrap_servers=bootstrap, api_version=(0, 9))
-
- # host order is randomized internally, so resort before testing
- (hosts,), _ = KafkaClient._bootstrap.call_args # pylint: disable=no-member
- assert sorted(hosts) == sorted(expected_hosts)
+def cli(mocker, conn):
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
+ client = KafkaClient(api_version=(0, 9))
+ client.poll(future=client.cluster.request_update())
+ return client
-def test_bootstrap_success(conn):
+def test_bootstrap(mocker, conn):
conn.state = ConnectionStates.CONNECTED
+ mocker.patch('kafka.cluster.dns_lookup',
+ return_value=[(socket.AF_INET, None, None, None, ('localhost', 9092))])
cli = KafkaClient(api_version=(0, 9))
+ future = cli.cluster.request_update()
+ cli.poll(future=future)
+
+ assert future.succeeded()
args, kwargs = conn.call_args
assert args == ('localhost', 9092, socket.AF_UNSPEC)
kwargs.pop('state_change_callback')
kwargs.pop('node_id')
assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.send.assert_called_once_with(MetadataRequest[0]([]))
+ conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False)
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None),
BrokerMetadata(1, 'bar', 34, None)])
-def test_bootstrap_failure(conn):
- conn.connect_blocking.return_value = False
- cli = KafkaClient(api_version=(0, 9))
- args, kwargs = conn.call_args
- assert args == ('localhost', 9092, socket.AF_UNSPEC)
- kwargs.pop('state_change_callback')
- kwargs.pop('node_id')
- assert kwargs == cli.config
- conn.connect_blocking.assert_called_with()
- conn.close.assert_called_with()
- assert cli._bootstrap_fails == 1
- assert cli.cluster.brokers() == set()
-
-
def test_can_connect(cli, conn):
# Node is not in broker metadata - can't connect
assert not cli._can_connect(2)
@@ -187,22 +163,26 @@ def test_is_ready(mocker, cli, conn):
def test_close(mocker, cli, conn):
mocker.patch.object(cli, '_selector')
- # bootstrap connection should have been closed
- assert conn.close.call_count == 1
+ call_count = conn.close.call_count
# Unknown node - silent
cli.close(2)
+ call_count += 0
+ assert conn.close.call_count == call_count
# Single node close
cli._maybe_connect(0)
- assert conn.close.call_count == 1
+ assert conn.close.call_count == call_count
cli.close(0)
- assert conn.close.call_count == 2
+ call_count += 1
+ assert conn.close.call_count == call_count
# All node close
cli._maybe_connect(1)
cli.close()
- assert conn.close.call_count == 4
+ # +3 close: node 0, node 1, node bootstrap
+ call_count += 3
+ assert conn.close.call_count == call_count
def test_is_disconnected(cli, conn):
@@ -249,7 +229,6 @@ def test_send(cli, conn):
def test_poll(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(api_version=(0, 9))
@@ -309,7 +288,6 @@ def test_set_topics(mocker):
@pytest.fixture
def client(mocker):
- mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
cli = KafkaClient(request_timeout_ms=9999999,