diff options
-rw-r--r-- | test/test_client.py | 19 | ||||
-rw-r--r-- | test/test_conn.py | 66 | ||||
-rw-r--r-- | test/test_conn_legacy.py | 67 |
3 files changed, 74 insertions, 78 deletions
diff --git a/test/test_client.py b/test/test_client.py index 4b5a3a8..660af61 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1,12 +1,10 @@ import socket -from time import sleep from mock import ANY, MagicMock, patch import six from . import unittest from kafka import SimpleClient -from kafka.conn import KafkaConnection from kafka.errors import ( KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError, UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError) @@ -15,7 +13,6 @@ from kafka.protocol import KafkaProtocol, create_message from kafka.protocol.metadata import MetadataResponse from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition -from test.testutil import Timer NO_ERROR = 0 UNKNOWN_TOPIC_OR_PARTITION = 3 @@ -91,7 +88,7 @@ class TestSimpleClient(unittest.TestCase): ('kafka02', 9092): MagicMock(), ('kafka03', 9092): MagicMock() } - # inject KafkaConnection side effects + # inject BrokerConnection side effects mock_conn(mocked_conns[('kafka01', 9092)], success=False) mock_conn(mocked_conns[('kafka03', 9092)], success=False) future = Future() @@ -389,19 +386,6 @@ class TestSimpleClient(unittest.TestCase): with self.assertRaises(FailedPayloadsError): client.send_produce_request(requests) - def test_timeout(self): - def _timeout(*args, **kwargs): - timeout = args[1] - sleep(timeout) - raise socket.timeout - - with patch.object(socket, "create_connection", side_effect=_timeout): - - with Timer() as t: - with self.assertRaises(ConnectionError): - KafkaConnection("nowhere", 1234, 1.0) - self.assertGreaterEqual(t.interval, 1.0) - def test_correlation_rollover(self): with patch.object(SimpleClient, 'load_metadata_for_topics'): big_num = 2**31 - 3 @@ -409,4 +393,3 @@ class TestSimpleClient(unittest.TestCase): self.assertEqual(big_num + 1, client._next_id()) self.assertEqual(big_num + 2, client._next_id()) self.assertEqual(0, client._next_id()) - diff --git a/test/test_conn.py b/test/test_conn.py index 6a3b154..4f2b12f 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -7,7 +7,7 @@ import time import pytest -from kafka.conn import BrokerConnection, ConnectionStates +from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts from kafka.protocol.api import RequestHeader from kafka.protocol.metadata import MetadataRequest @@ -29,14 +29,14 @@ def conn(_socket): @pytest.mark.parametrize("states", [ - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),), - (([EALREADY, EALREADY], ConnectionStates.CONNECTING),), - (([0], ConnectionStates.CONNECTED),), - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), - ([ECONNRESET], ConnectionStates.DISCONNECTED)), - (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), - ([EALREADY], ConnectionStates.CONNECTING), - ([EISCONN], ConnectionStates.CONNECTED)), + (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),), + (([EALREADY, EALREADY], ConnectionStates.CONNECTING),), + (([0], ConnectionStates.CONNECTED),), + (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), + ([ECONNRESET], ConnectionStates.DISCONNECTED)), + (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING), + ([EALREADY], ConnectionStates.CONNECTING), + ([EISCONN], ConnectionStates.CONNECTED)), ]) def test_connect(_socket, conn, states): assert conn.state is ConnectionStates.DISCONNECTED @@ -216,3 +216,51 @@ def test_recv(_socket, conn): def test_close(conn): pass # TODO + + +def test_collect_hosts__happy_path(): + hosts = "127.0.0.1:1234,127.0.0.1" + results = collect_hosts(hosts) + assert set(results) == set([ + ('127.0.0.1', 1234, socket.AF_INET), + ('127.0.0.1', 9092, socket.AF_INET), + ]) + + +def test_collect_hosts__ipv6(): + hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234" + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_INET6), + ('2001:1000:2000::1', 9092, socket.AF_INET6), + ('2001:1000:2000::1', 1234, socket.AF_INET6), + ]) + + +def test_collect_hosts__string_list(): + hosts = [ + 'localhost:1234', + 'localhost', + '[localhost]', + '2001::1', + '[2001::1]', + '[2001::1]:1234', + ] + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_INET6), + ('2001::1', 9092, socket.AF_INET6), + ('2001::1', 9092, socket.AF_INET6), + ('2001::1', 1234, socket.AF_INET6), + ]) + + +def test_collect_hosts__with_spaces(): + hosts = "localhost:1234, localhost" + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), + ]) diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py index 820c4e7..ca3b17a 100644 --- a/test/test_conn_legacy.py +++ b/test/test_conn_legacy.py @@ -1,12 +1,14 @@ import socket import struct from threading import Thread +import time import mock from . import unittest from kafka.errors import ConnectionError -from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from test.testutil import Timer class ConnTest(unittest.TestCase): @@ -47,56 +49,6 @@ class ConnTest(unittest.TestCase): # Reset any mock counts caused by __init__ self.MockCreateConn.reset_mock() - def test_collect_hosts__happy_path(self): - hosts = "127.0.0.1:1234,127.0.0.1" - results = collect_hosts(hosts) - - self.assertEqual(set(results), set([ - ('127.0.0.1', 1234, socket.AF_INET), - ('127.0.0.1', 9092, socket.AF_INET), - ])) - - def test_collect_hosts__ipv6(self): - hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234" - results = collect_hosts(hosts) - - self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_INET6), - ('2001:1000:2000::1', 9092, socket.AF_INET6), - ('2001:1000:2000::1', 1234, socket.AF_INET6), - ])) - - def test_collect_hosts__string_list(self): - hosts = [ - 'localhost:1234', - 'localhost', - '[localhost]', - '2001::1', - '[2001::1]', - '[2001::1]:1234', - ] - - results = collect_hosts(hosts) - - self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_INET6), - ('2001::1', 9092, socket.AF_INET6), - ('2001::1', 9092, socket.AF_INET6), - ('2001::1', 1234, socket.AF_INET6), - ])) - - def test_collect_hosts__with_spaces(self): - hosts = "localhost:1234, localhost" - results = collect_hosts(hosts) - - self.assertEqual(set(results), set([ - ('localhost', 1234, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_UNSPEC), - ])) - - def test_send(self): self.conn.send(self.config['request_id'], self.config['payload']) self.conn._sock.sendall.assert_called_with(self.config['payload']) @@ -243,3 +195,16 @@ class TestKafkaConnection(unittest.TestCase): self.assertEqual(err, [None]) self.assertEqual(socket.call_count, 2) + + def test_timeout(self): + def _timeout(*args, **kwargs): + timeout = args[1] + time.sleep(timeout) + raise socket.timeout + + with mock.patch.object(socket, "create_connection", side_effect=_timeout): + + with Timer() as t: + with self.assertRaises(ConnectionError): + KafkaConnection("nowhere", 1234, 1.0) + self.assertGreaterEqual(t.interval, 1.0) |