summaryrefslogtreecommitdiff
path: root/test/test_client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-09 18:43:46 -0800
committerDana Powers <dana.powers@rd.io>2016-01-09 18:54:08 -0800
commit651454a074114d804fc0517dff8d5cf884284594 (patch)
tree6c063e92606be44286531de2fd0ad7b62120196b /test/test_client_async.py
parentcc22d1bab82fd234f2a47d347152a321aaa0b53e (diff)
downloadkafka-python-651454a074114d804fc0517dff8d5cf884284594.tar.gz
Fill out more async client unit tests
Diffstat (limited to 'test/test_client_async.py')
-rw-r--r--test/test_client_async.py183
1 files changed, 165 insertions, 18 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index aa8ff11..447ea49 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -3,9 +3,11 @@ import pytest
from kafka.client_async import KafkaClient
from kafka.common import BrokerMetadata
+import kafka.common as Errors
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse, MetadataRequest
+from kafka.protocol.produce import ProduceRequest
@pytest.mark.parametrize("bootstrap,expected_hosts", [
@@ -36,6 +38,8 @@ def conn(mocker):
MetadataResponse(
[(0, 'foo', 12), (1, 'bar', 34)], # brokers
[])) # topics
+ conn.blacked_out.return_value = False
+ conn.connect.return_value = conn.state
return conn
@@ -59,40 +63,183 @@ def test_bootstrap_failure(conn):
assert cli.cluster.brokers() == set()
-def test_can_connect():
- pass
+def test_can_connect(conn):
+ cli = KafkaClient()
+ # Node is not in broker metadata - cant connect
+ assert not cli._can_connect(2)
-def test_initiate_connect():
- pass
+ # Node is in broker metadata but not in _conns
+ assert 0 not in cli._conns
+ assert cli._can_connect(0)
+ # Node is connected, can't reconnect
+ cli._initiate_connect(0)
+ assert not cli._can_connect(0)
-def test_finish_connect():
- pass
+ # Node is disconnected, can connect
+ cli._conns[0].state = ConnectionStates.DISCONNECTED
+ assert cli._can_connect(0)
+ # Node is disconnected, but blacked out
+ conn.blacked_out.return_value = True
+ assert not cli._can_connect(0)
-def test_ready():
- pass
+def test_initiate_connect(conn):
+ cli = KafkaClient()
+ try:
+ # Node not in metadata, raises AssertionError
+ cli._initiate_connect(2)
+ except AssertionError:
+ pass
+ else:
+ assert False, 'Exception not raised'
+ assert 0 not in cli._conns
+ state = cli._initiate_connect(0)
+ assert cli._conns[0] is conn
+ assert state is conn.state
-def test_close():
- pass
+def test_finish_connect(conn):
+ cli = KafkaClient()
+ try:
+ # Node not in metadata, raises AssertionError
+ cli._initiate_connect(2)
+ except AssertionError:
+ pass
+ else:
+ assert False, 'Exception not raised'
-def test_is_disconnected():
- pass
+ assert 0 not in cli._conns
+ cli._initiate_connect(0)
+ conn.connect.return_value = ConnectionStates.CONNECTING
+ state = cli._finish_connect(0)
+ assert 0 in cli._connecting
+ assert state is ConnectionStates.CONNECTING
-def test_is_ready():
- pass
+ conn.connect.return_value = ConnectionStates.CONNECTED
+ state = cli._finish_connect(0)
+ assert 0 not in cli._connecting
+ assert state is ConnectionStates.CONNECTED
+ # Failure to connect should trigger metadata update
+ assert not cli.cluster._need_update
+ cli._connecting.add(0)
+ conn.connect.return_value = ConnectionStates.DISCONNECTED
+ state = cli._finish_connect(0)
+ assert 0 not in cli._connecting
+ assert state is ConnectionStates.DISCONNECTED
+ assert cli.cluster._need_update
-def test_can_send_request():
- pass
+def test_ready(conn):
+ cli = KafkaClient()
-def test_send():
- pass
+ # Node not in metadata
+ assert not cli.ready(2)
+
+ # Node in metadata will connect
+ assert 0 not in cli._conns
+ assert cli.ready(0)
+ assert 0 in cli._conns
+ assert cli._conns[0].state is ConnectionStates.CONNECTED
+
+ # metadata refresh blocks ready nodes
+ assert cli.ready(0)
+ assert cli.ready(1)
+ cli._metadata_refresh_in_progress = True
+ assert not cli.ready(0)
+ assert not cli.ready(1)
+
+ # requesting metadata update also blocks ready nodes
+ cli._metadata_refresh_in_progress = False
+ assert cli.ready(0)
+ assert cli.ready(1)
+ cli.cluster.request_update()
+ cli.cluster.config['retry_backoff_ms'] = 0
+ assert not cli._metadata_refresh_in_progress
+ assert not cli.ready(0)
+ assert not cli.ready(1)
+ cli.cluster._need_update = False
+
+ # if connection can't send more, not ready
+ assert cli.ready(0)
+ assert cli.ready(1)
+ conn.can_send_more.return_value = False
+ assert not cli.ready(0)
+ conn.can_send_more.return_value = True
+
+ # disconnected nodes, not ready
+ assert cli.ready(0)
+ assert cli.ready(1)
+ conn.connected.return_value = False
+ assert not cli.ready(0)
+ conn.connected.return_value = True
+
+ # connecting node connects
+ cli._connecting.add(0)
+ conn.connected.return_value = False
+ cli.ready(0)
+ assert 0 not in cli._connecting
+ assert cli._conns[0].connect.called_with()
+
+
+def test_close(conn):
+ cli = KafkaClient()
+
+ # Unknown node - silent
+ cli.close(2)
+
+ # Single node close
+ cli._initiate_connect(0)
+ assert not conn.close.call_count
+ cli.close(0)
+ assert conn.close.call_count == 1
+
+ # All node close
+ cli._initiate_connect(1)
+ cli.close()
+ assert conn.close.call_count == 3
+
+
+def test_is_disconnected(conn):
+ cli = KafkaClient()
+
+ # False if not connected yet
+ conn.state = ConnectionStates.DISCONNECTED
+ assert not cli.is_disconnected(0)
+
+ cli._initiate_connect(0)
+ assert cli.is_disconnected(0)
+
+ conn.state = ConnectionStates.CONNECTING
+ assert not cli.is_disconnected(0)
+
+ conn.state = ConnectionStates.CONNECTED
+ assert not cli.is_disconnected(0)
+
+
+def test_send(conn):
+ cli = KafkaClient()
+ try:
+ cli.send(2, None)
+ except Errors.NodeNotReadyError:
+ pass
+ else:
+ assert False, 'NodeNotReadyError not raised'
+
+ cli._initiate_connect(0)
+ # ProduceRequest w/ 0 required_acks -> no response
+ request = ProduceRequest(0, 0, [])
+ ret = cli.send(0, request)
+ assert conn.send.called_with(request, expect_response=False)
+ assert isinstance(ret, Future)
+
+ request = MetadataRequest([])
+ cli.send(0, request)
+ assert conn.send.called_with(request, expect_response=True)
def test_poll():