diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-05 08:10:00 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-05 08:10:00 -0700 |
commit | 679d7485b1fdf049879099404dcecc8496569d2b (patch) | |
tree | c83a38ee159dadf55b0f88ae579b64f6d4bb1885 | |
parent | d81963a919fa8161c94b5bef5e6de0697b91c4a6 (diff) | |
parent | bb25469fdaf6e0bfe929f12173578e8fdf114094 (diff) | |
download | kafka-python-679d7485b1fdf049879099404dcecc8496569d2b.tar.gz |
Merge pull request #611 from dpkp/sock_send_bytes
Handle partial socket send()
-rw-r--r-- | kafka/conn.py | 10 | ||||
-rw-r--r-- | kafka/future.py | 4 | ||||
-rw-r--r-- | test/test_conn.py | 111 |
3 files changed, 110 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2b82b6d..ffc839e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -188,10 +188,12 @@ class BrokerConnection(object): # and send bytes asynchronously. For now, just block # sending each request payload self._sock.setblocking(True) - sent_bytes = self._sock.send(size) - assert sent_bytes == len(size) - sent_bytes = self._sock.send(message) - assert sent_bytes == len(message) + for data in (size, message): + total_sent = 0 + while total_sent < len(data): + sent_bytes = self._sock.send(data[total_sent:]) + total_sent += sent_bytes + assert total_sent == len(data) self._sock.setblocking(False) except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) diff --git a/kafka/future.py b/kafka/future.py index 06b8c3a..c7e0b14 100644 --- a/kafka/future.py +++ b/kafka/future.py @@ -15,10 +15,10 @@ class Future(object): self._errbacks = [] def succeeded(self): - return self.is_done and not self.exception + return self.is_done and not bool(self.exception) def failed(self): - return self.is_done and self.exception + return self.is_done and bool(self.exception) def retriable(self): try: diff --git a/test/test_conn.py b/test/test_conn.py index d394f74..5432ebd 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -2,12 +2,15 @@ from __future__ import absolute_import from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET -import socket import time import pytest from kafka.conn import BrokerConnection, ConnectionStates +from kafka.protocol.api import RequestHeader +from kafka.protocol.metadata import MetadataRequest + +import kafka.common as Errors @pytest.fixture @@ -20,6 +23,7 @@ def socket(mocker): @pytest.fixture def conn(socket): + from socket import AF_INET conn = BrokerConnection('localhost', 9092, socket.AF_INET) return conn @@ -61,22 +65,111 @@ def test_connect_timeout(socket, conn): def test_blacked_out(conn): - assert not conn.blacked_out() + assert conn.blacked_out() is False conn.last_attempt = time.time() - assert conn.blacked_out() + assert conn.blacked_out() is True def test_connected(conn): - assert not conn.connected() + assert conn.connected() is False conn.state = ConnectionStates.CONNECTED - assert conn.connected() + assert conn.connected() is True def test_connecting(conn): - assert not conn.connecting() + assert conn.connecting() is False + conn.state = ConnectionStates.CONNECTING + assert conn.connecting() is True + conn.state = ConnectionStates.CONNECTED + assert conn.connecting() is False + + +def test_send_disconnected(conn): + conn.state = ConnectionStates.DISCONNECTED + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.ConnectionError) + + +def test_send_connecting(conn): conn.state = ConnectionStates.CONNECTING - assert conn.connecting() + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.NodeNotReadyError) + + +def test_send_max_ifr(conn): conn.state = ConnectionStates.CONNECTED - assert not conn.connecting() + max_ifrs = conn.config['max_in_flight_requests_per_connection'] + for _ in range(max_ifrs): + conn.in_flight_requests.append('foo') + f = conn.send('foobar') + assert f.failed() is True + assert isinstance(f.exception, Errors.TooManyInFlightRequests) + + +def test_send_no_response(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + payload_bytes = len(header.encode()) + len(req.encode()) + third = payload_bytes // 3 + remainder = payload_bytes % 3 + socket.send.side_effect = [4, third, third, third, remainder] + + assert len(conn.in_flight_requests) == 0 + f = conn.send(req, expect_response=False) + assert f.succeeded() is True + assert f.value is None + assert len(conn.in_flight_requests) == 0 + + +def test_send_response(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + payload_bytes = len(header.encode()) + len(req.encode()) + third = payload_bytes // 3 + remainder = payload_bytes % 3 + socket.send.side_effect = [4, third, third, third, remainder] + + assert len(conn.in_flight_requests) == 0 + f = conn.send(req) + assert f.is_done is False + assert len(conn.in_flight_requests) == 1 + + +def test_send_error(socket, conn): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + req = MetadataRequest([]) + header = RequestHeader(req, client_id=conn.config['client_id']) + try: + error = ConnectionError + except NameError: + from socket import error + socket.send.side_effect = error + f = conn.send(req) + assert f.failed() is True + assert isinstance(f.exception, Errors.ConnectionError) + assert socket.close.call_count == 1 + assert conn.state is ConnectionStates.DISCONNECTED + + +def test_can_send_more(conn): + assert conn.can_send_more() is True + max_ifrs = conn.config['max_in_flight_requests_per_connection'] + for _ in range(max_ifrs): + assert conn.can_send_more() is True + conn.in_flight_requests.append('foo') + assert conn.can_send_more() is False + + +def test_recv(socket, conn): + pass # TODO + -# TODO: test_send, test_recv, test_can_send_more, test_close +def test_close(conn): + pass # TODO |