summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/conn.py10
-rw-r--r--kafka/future.py4
-rw-r--r--test/test_conn.py111
3 files changed, 110 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b82b6d..ffc839e 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -188,10 +188,12 @@ class BrokerConnection(object):
# and send bytes asynchronously. For now, just block
# sending each request payload
self._sock.setblocking(True)
- sent_bytes = self._sock.send(size)
- assert sent_bytes == len(size)
- sent_bytes = self._sock.send(message)
- assert sent_bytes == len(message)
+ for data in (size, message):
+ total_sent = 0
+ while total_sent < len(data):
+ sent_bytes = self._sock.send(data[total_sent:])
+ total_sent += sent_bytes
+ assert total_sent == len(data)
self._sock.setblocking(False)
except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
diff --git a/kafka/future.py b/kafka/future.py
index 06b8c3a..c7e0b14 100644
--- a/kafka/future.py
+++ b/kafka/future.py
@@ -15,10 +15,10 @@ class Future(object):
self._errbacks = []
def succeeded(self):
- return self.is_done and not self.exception
+ return self.is_done and not bool(self.exception)
def failed(self):
- return self.is_done and self.exception
+ return self.is_done and bool(self.exception)
def retriable(self):
try:
diff --git a/test/test_conn.py b/test/test_conn.py
index d394f74..5432ebd 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -2,12 +2,15 @@
from __future__ import absolute_import
from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET
-import socket
import time
import pytest
from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.protocol.api import RequestHeader
+from kafka.protocol.metadata import MetadataRequest
+
+import kafka.common as Errors
@pytest.fixture
@@ -20,6 +23,7 @@ def socket(mocker):
@pytest.fixture
def conn(socket):
+ from socket import AF_INET
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
return conn
@@ -61,22 +65,111 @@ def test_connect_timeout(socket, conn):
def test_blacked_out(conn):
- assert not conn.blacked_out()
+ assert conn.blacked_out() is False
conn.last_attempt = time.time()
- assert conn.blacked_out()
+ assert conn.blacked_out() is True
def test_connected(conn):
- assert not conn.connected()
+ assert conn.connected() is False
conn.state = ConnectionStates.CONNECTED
- assert conn.connected()
+ assert conn.connected() is True
def test_connecting(conn):
- assert not conn.connecting()
+ assert conn.connecting() is False
+ conn.state = ConnectionStates.CONNECTING
+ assert conn.connecting() is True
+ conn.state = ConnectionStates.CONNECTED
+ assert conn.connecting() is False
+
+
+def test_send_disconnected(conn):
+ conn.state = ConnectionStates.DISCONNECTED
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.ConnectionError)
+
+
+def test_send_connecting(conn):
conn.state = ConnectionStates.CONNECTING
- assert conn.connecting()
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.NodeNotReadyError)
+
+
+def test_send_max_ifr(conn):
conn.state = ConnectionStates.CONNECTED
- assert not conn.connecting()
+ max_ifrs = conn.config['max_in_flight_requests_per_connection']
+ for _ in range(max_ifrs):
+ conn.in_flight_requests.append('foo')
+ f = conn.send('foobar')
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.TooManyInFlightRequests)
+
+
+def test_send_no_response(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ payload_bytes = len(header.encode()) + len(req.encode())
+ third = payload_bytes // 3
+ remainder = payload_bytes % 3
+ socket.send.side_effect = [4, third, third, third, remainder]
+
+ assert len(conn.in_flight_requests) == 0
+ f = conn.send(req, expect_response=False)
+ assert f.succeeded() is True
+ assert f.value is None
+ assert len(conn.in_flight_requests) == 0
+
+
+def test_send_response(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ payload_bytes = len(header.encode()) + len(req.encode())
+ third = payload_bytes // 3
+ remainder = payload_bytes % 3
+ socket.send.side_effect = [4, third, third, third, remainder]
+
+ assert len(conn.in_flight_requests) == 0
+ f = conn.send(req)
+ assert f.is_done is False
+ assert len(conn.in_flight_requests) == 1
+
+
+def test_send_error(socket, conn):
+ conn.connect()
+ assert conn.state is ConnectionStates.CONNECTED
+ req = MetadataRequest([])
+ header = RequestHeader(req, client_id=conn.config['client_id'])
+ try:
+ error = ConnectionError
+ except NameError:
+ from socket import error
+ socket.send.side_effect = error
+ f = conn.send(req)
+ assert f.failed() is True
+ assert isinstance(f.exception, Errors.ConnectionError)
+ assert socket.close.call_count == 1
+ assert conn.state is ConnectionStates.DISCONNECTED
+
+
+def test_can_send_more(conn):
+ assert conn.can_send_more() is True
+ max_ifrs = conn.config['max_in_flight_requests_per_connection']
+ for _ in range(max_ifrs):
+ assert conn.can_send_more() is True
+ conn.in_flight_requests.append('foo')
+ assert conn.can_send_more() is False
+
+
+def test_recv(socket, conn):
+ pass # TODO
+
-# TODO: test_send, test_recv, test_can_send_more, test_close
+def test_close(conn):
+ pass # TODO