summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-04 22:52:00 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-05 00:16:15 -0700
commit87da3f00800c381f1fc1a5ca543659d58f90e66d (patch)
tree46fca61412205b12ff2f723c0d39dcb0513cb141 /test
parentd81963a919fa8161c94b5bef5e6de0697b91c4a6 (diff)
downloadkafka-python-87da3f00800c381f1fc1a5ca543659d58f90e66d.tar.gz
Add BrokerConnection.send tests
Diffstat (limited to 'test')
-rw-r--r--test/test_conn.py111
1 files changed, 102 insertions, 9 deletions
diff --git a/test/test_conn.py b/test/test_conn.py
index d394f74..5432ebd 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -2,12 +2,15 @@
from __future__ import absolute_import
from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET
-import socket
import time
import pytest
from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.protocol.api import RequestHeader
+from kafka.protocol.metadata import MetadataRequest
+
+import kafka.common as Errors
@pytest.fixture
@@ -20,6 +23,7 @@ def socket(mocker):
@pytest.fixture
def conn(socket):
+ from socket import AF_INET
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
return conn
@@ -61,22 +65,111 @@ def test_connect_timeout(socket, conn):
def test_blacked_out(conn):
- assert not conn.blacked_out()
+ assert conn.blacked_out() is False
conn.last_attempt = time.time()
- assert conn.blacked_out()
+ assert conn.blacked_out() is True
def test_connected(conn):
- assert not conn.connected()
+ assert conn.connected() is False
conn.state = ConnectionStates.CONNECTED
- assert conn.connected()
+ assert conn.connected() is True
def test_connecting(conn):
- assert not conn.connecting()
+ assert conn.connecting() is False
+ conn.state = ConnectionStates.CONNECTING
+ assert conn.connecting() is True
+ conn.state = ConnectionStates.CONNECTED
+ assert conn.connecting() is False
+
+
+def test_send_disconnected(conn):
+ conn.state = ConnectionStates.DISCONNECTED
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.ConnectionError)
+
+
+def test_send_connecting(conn):
conn.state = ConnectionStates.CONNECTING
- assert conn.connecting()
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.NodeNotReadyError)
+
+
+def test_send_max_ifr(conn):
conn.state = ConnectionStates.CONNECTED
- assert not conn.connecting()
+ max_ifrs = conn.config['max_in_flight_requests_per_connection']
+ for _ in range(max_ifrs):
+ conn.in_flight_requests.append('foo')
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.TooManyInFlightRequests)
+
+
+def test_send_no_response(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ payload_bytes = len(header.encode()) + len(req.encode())
+ third = payload_bytes // 3
+ remainder = payload_bytes % 3
+ socket.send.side_effect = [4, third, third, third, remainder]
+
+ assert len(conn.in_flight_requests) == 0
+ f = conn.send(req, expect_response=False)
+ assert f.succeeded() is True
+ assert f.value is None
+ assert len(conn.in_flight_requests) == 0
+
+
+def test_send_response(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ payload_bytes = len(header.encode()) + len(req.encode())
+ third = payload_bytes // 3
+ remainder = payload_bytes % 3
+ socket.send.side_effect = [4, third, third, third, remainder]
+
+ assert len(conn.in_flight_requests) == 0
+ f = conn.send(req)
+ assert f.is_done is False
+ assert len(conn.in_flight_requests) == 1
+
+
+def test_send_error(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ try:
+ error = ConnectionError
+ except NameError:
+ from socket import error
+ socket.send.side_effect = error
+ f = conn.send(req)
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.ConnectionError)
+ assert socket.close.call_count == 1
+ assert conn.state is ConnectionStates.DISCONNECTED
+
+
+def test_can_send_more(conn):
+ assert conn.can_send_more() is True
+ max_ifrs = conn.config['max_in_flight_requests_per_connection']
+ for _ in range(max_ifrs):
+ assert conn.can_send_more() is True
+ conn.in_flight_requests.append('foo')
+ assert conn.can_send_more() is False
+
+
+def test_recv(socket, conn):
+ pass # TODO
+
-# TODO: test_send, test_recv, test_can_send_more, test_close
+def test_close(conn):
+ pass # TODO