diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-04 22:52:00 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 00:16:15 -0700 |
commit | 87da3f00800c381f1fc1a5ca543659d58f90e66d (patch) | |
tree | 46fca61412205b12ff2f723c0d39dcb0513cb141 /test | |
parent | d81963a919fa8161c94b5bef5e6de0697b91c4a6 (diff) | |
download | kafka-python-87da3f00800c381f1fc1a5ca543659d58f90e66d.tar.gz |
Add BrokerConnection.send tests
Diffstat (limited to 'test')
-rw-r--r-- | test/test_conn.py | 111 |
1 files changed, 102 insertions, 9 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index d394f74..5432ebd 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -2,12 +2,15 @@ from __future__ import absolute_import from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET -import socket import time import pytest from kafka.conn import BrokerConnection, ConnectionStates +from kafka.protocol.api import RequestHeader +from kafka.protocol.metadata import MetadataRequest + +import kafka.common as Errors @pytest.fixture @@ -20,6 +23,7 @@ def socket(mocker): @pytest.fixture def conn(socket): + from socket import AF_INET conn = BrokerConnection('localhost', 9092, socket.AF_INET) return conn @@ -61,22 +65,111 @@ def test_connect_timeout(socket, conn): def test_blacked_out(conn): - assert not conn.blacked_out() + assert conn.blacked_out() is False conn.last_attempt = time.time() - assert conn.blacked_out() + assert conn.blacked_out() is True def test_connected(conn): - assert not conn.connected() + assert conn.connected() is False conn.state = ConnectionStates.CONNECTED - assert conn.connected() + assert conn.connected() is True def test_connecting(conn): - assert not conn.connecting() + assert conn.connecting() is False + conn.state = ConnectionStates.CONNECTING + assert conn.connecting() is True + conn.state = ConnectionStates.CONNECTED + assert conn.connecting() is False + + +def test_send_disconnected(conn): + conn.state = ConnectionStates.DISCONNECTED + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.ConnectionError) + + +def test_send_connecting(conn): conn.state = ConnectionStates.CONNECTING - assert conn.connecting() + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.NodeNotReadyError) + + +def test_send_max_ifr(conn): conn.state = ConnectionStates.CONNECTED - assert not conn.connecting() + max_ifrs = conn.config['max_in_flight_requests_per_connection'] + for _ in range(max_ifrs): + conn.in_flight_requests.append('foo') + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.TooManyInFlightRequests) + + +def test_send_no_response(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + payload_bytes = len(header.encode()) + len(req.encode()) + third = payload_bytes // 3 + remainder = payload_bytes % 3 + socket.send.side_effect = [4, third, third, third, remainder] + + assert len(conn.in_flight_requests) == 0 + f = conn.send(req, expect_response=False) + assert f.succeeded() is True + assert f.value is None + assert len(conn.in_flight_requests) == 0 + + +def test_send_response(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + payload_bytes = len(header.encode()) + len(req.encode()) + third = payload_bytes // 3 + remainder = payload_bytes % 3 + socket.send.side_effect = [4, third, third, third, remainder] + + assert len(conn.in_flight_requests) == 0 + f = conn.send(req) + assert f.is_done is False + assert len(conn.in_flight_requests) == 1 + + +def test_send_error(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + try: + error = ConnectionError + except NameError: + from socket import error + socket.send.side_effect = error + f = conn.send(req) + assert f.failed() is True + assert isinstance(f.exception, Errors.ConnectionError) + assert socket.close.call_count == 1 + assert conn.state is ConnectionStates.DISCONNECTED + + +def test_can_send_more(conn): + assert conn.can_send_more() is True + max_ifrs = conn.config['max_in_flight_requests_per_connection'] + for _ in range(max_ifrs): + assert conn.can_send_more() is True + conn.in_flight_requests.append('foo') + assert conn.can_send_more() is False + + +def test_recv(socket, conn): + pass # TODO + -# TODO: test_send, test_recv, test_can_send_more, test_close +def test_close(conn): + pass # TODO |