summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-06 06:44:25 -0800
committerJeff Widman <jeff@jeffwidman.com>2019-03-07 16:52:04 -0800
commit957c62d6ded7a3652e7897db20a23e070a6ad852 (patch)
tree524d8ee0c42ddc40459aea78e3e9cd1508a14675 /test
parent23132863d0e00bd8aabc0e19c7e1822dabfb05b9 (diff)
downloadkafka-python-957c62d6ded7a3652e7897db20a23e070a6ad852.tar.gz
Move all network connection IO into KafkaClient.poll()
Diffstat (limited to 'test')
-rw-r--r--test/fixtures.py7
-rw-r--r--test/test_client_async.py9
2 files changed, 8 insertions, 8 deletions
diff --git a/test/fixtures.py b/test/fixtures.py
index 34373e6..8b156e6 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -405,10 +405,11 @@ class KafkaFixture(Fixture):
retries = 10
while True:
node_id = self._client.least_loaded_node()
- for ready_retry in range(40):
- if self._client.ready(node_id, False):
+ for connect_retry in range(40):
+ self._client.maybe_connect(node_id)
+ if self._client.connected(node_id):
break
- time.sleep(.1)
+ self._client.poll(timeout_ms=100)
else:
raise RuntimeError('Could not connect to broker with node id %d' % (node_id,))
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 09781ac..1c8a50f 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn):
conn.state = ConnectionStates.CONNECTED
cli._conn_state_change(node_id, conn)
assert node_id not in cli._connecting
- sel.unregister.assert_called_with(conn._sock)
- sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
+ sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn)
# Failure to connect should trigger metadata update
assert cli.cluster._need_update is False
@@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn):
def test_ready(mocker, cli, conn):
- maybe_connect = mocker.patch.object(cli, '_maybe_connect')
+ maybe_connect = mocker.patch.object(cli, 'maybe_connect')
node_id = 1
cli.ready(node_id)
maybe_connect.assert_called_with(node_id)
@@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
mocker.patch.object(client, '_can_connect', return_value=True)
mocker.patch.object(client, '_maybe_connect', return_value=True)
+ mocker.patch.object(client, 'maybe_connect', return_value=True)
now = time.time()
t = mocker.patch('time.time')
@@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
# first poll attempts connection
client.poll(timeout_ms=12345678)
client._poll.assert_called_with(2.222) # reconnect backoff
- client._can_connect.assert_called_once_with('foobar')
- client._maybe_connect.assert_called_once_with('foobar')
+ client.maybe_connect.assert_called_once_with('foobar')
# poll while connecting should not attempt a new connection
client._connecting.add('foobar')