summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-04 16:49:38 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-04 16:49:38 -0700
commit81860eeea1449678fb2d42082e08d1bc40cf1f30 (patch)
tree60adffe20e196f45c69ce0924768a4ebedf901ae
parent2afe09e7c17af4ad311f37f1562b9717d934561c (diff)
downloadkafka-python-81860eeea1449678fb2d42082e08d1bc40cf1f30.tar.gz
Rearrange connection tests to separate legacy KafkaConnection
-rw-r--r--test/test_client.py19
-rw-r--r--test/test_conn.py66
-rw-r--r--test/test_conn_legacy.py67
3 files changed, 74 insertions, 78 deletions
diff --git a/test/test_client.py b/test/test_client.py
index 4b5a3a8..660af61 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -1,12 +1,10 @@
import socket
-from time import sleep
from mock import ANY, MagicMock, patch
import six
from . import unittest
from kafka import SimpleClient
-from kafka.conn import KafkaConnection
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
@@ -15,7 +13,6 @@ from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
-from test.testutil import Timer
NO_ERROR = 0
UNKNOWN_TOPIC_OR_PARTITION = 3
@@ -91,7 +88,7 @@ class TestSimpleClient(unittest.TestCase):
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
- # inject KafkaConnection side effects
+ # inject BrokerConnection side effects
mock_conn(mocked_conns[('kafka01', 9092)], success=False)
mock_conn(mocked_conns[('kafka03', 9092)], success=False)
future = Future()
@@ -389,19 +386,6 @@ class TestSimpleClient(unittest.TestCase):
with self.assertRaises(FailedPayloadsError):
client.send_produce_request(requests)
- def test_timeout(self):
- def _timeout(*args, **kwargs):
- timeout = args[1]
- sleep(timeout)
- raise socket.timeout
-
- with patch.object(socket, "create_connection", side_effect=_timeout):
-
- with Timer() as t:
- with self.assertRaises(ConnectionError):
- KafkaConnection("nowhere", 1234, 1.0)
- self.assertGreaterEqual(t.interval, 1.0)
-
def test_correlation_rollover(self):
with patch.object(SimpleClient, 'load_metadata_for_topics'):
big_num = 2**31 - 3
@@ -409,4 +393,3 @@ class TestSimpleClient(unittest.TestCase):
self.assertEqual(big_num + 1, client._next_id())
self.assertEqual(big_num + 2, client._next_id())
self.assertEqual(0, client._next_id())
-
diff --git a/test/test_conn.py b/test/test_conn.py
index 6a3b154..4f2b12f 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -7,7 +7,7 @@ import time
import pytest
-from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.protocol.api import RequestHeader
from kafka.protocol.metadata import MetadataRequest
@@ -29,14 +29,14 @@ def conn(_socket):
@pytest.mark.parametrize("states", [
- (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),),
- (([EALREADY, EALREADY], ConnectionStates.CONNECTING),),
- (([0], ConnectionStates.CONNECTED),),
- (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
- ([ECONNRESET], ConnectionStates.DISCONNECTED)),
- (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
- ([EALREADY], ConnectionStates.CONNECTING),
- ([EISCONN], ConnectionStates.CONNECTED)),
+ (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),),
+ (([EALREADY, EALREADY], ConnectionStates.CONNECTING),),
+ (([0], ConnectionStates.CONNECTED),),
+ (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
+ ([ECONNRESET], ConnectionStates.DISCONNECTED)),
+ (([EINPROGRESS, EALREADY], ConnectionStates.CONNECTING),
+ ([EALREADY], ConnectionStates.CONNECTING),
+ ([EISCONN], ConnectionStates.CONNECTED)),
])
def test_connect(_socket, conn, states):
assert conn.state is ConnectionStates.DISCONNECTED
@@ -216,3 +216,51 @@ def test_recv(_socket, conn):
def test_close(conn):
pass # TODO
+
+
+def test_collect_hosts__happy_path():
+ hosts = "127.0.0.1:1234,127.0.0.1"
+ results = collect_hosts(hosts)
+ assert set(results) == set([
+ ('127.0.0.1', 1234, socket.AF_INET),
+ ('127.0.0.1', 9092, socket.AF_INET),
+ ])
+
+
+def test_collect_hosts__ipv6():
+ hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
+ results = collect_hosts(hosts)
+ assert set(results) == set([
+ ('localhost', 1234, socket.AF_INET6),
+ ('2001:1000:2000::1', 9092, socket.AF_INET6),
+ ('2001:1000:2000::1', 1234, socket.AF_INET6),
+ ])
+
+
+def test_collect_hosts__string_list():
+ hosts = [
+ 'localhost:1234',
+ 'localhost',
+ '[localhost]',
+ '2001::1',
+ '[2001::1]',
+ '[2001::1]:1234',
+ ]
+ results = collect_hosts(hosts)
+ assert set(results) == set([
+ ('localhost', 1234, socket.AF_UNSPEC),
+ ('localhost', 9092, socket.AF_UNSPEC),
+ ('localhost', 9092, socket.AF_INET6),
+ ('2001::1', 9092, socket.AF_INET6),
+ ('2001::1', 9092, socket.AF_INET6),
+ ('2001::1', 1234, socket.AF_INET6),
+ ])
+
+
+def test_collect_hosts__with_spaces():
+ hosts = "localhost:1234, localhost"
+ results = collect_hosts(hosts)
+ assert set(results) == set([
+ ('localhost', 1234, socket.AF_UNSPEC),
+ ('localhost', 9092, socket.AF_UNSPEC),
+ ])
diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py
index 820c4e7..ca3b17a 100644
--- a/test/test_conn_legacy.py
+++ b/test/test_conn_legacy.py
@@ -1,12 +1,14 @@
import socket
import struct
from threading import Thread
+import time
import mock
from . import unittest
from kafka.errors import ConnectionError
-from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
+from test.testutil import Timer
class ConnTest(unittest.TestCase):
@@ -47,56 +49,6 @@ class ConnTest(unittest.TestCase):
# Reset any mock counts caused by __init__
self.MockCreateConn.reset_mock()
- def test_collect_hosts__happy_path(self):
- hosts = "127.0.0.1:1234,127.0.0.1"
- results = collect_hosts(hosts)
-
- self.assertEqual(set(results), set([
- ('127.0.0.1', 1234, socket.AF_INET),
- ('127.0.0.1', 9092, socket.AF_INET),
- ]))
-
- def test_collect_hosts__ipv6(self):
- hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
- results = collect_hosts(hosts)
-
- self.assertEqual(set(results), set([
- ('localhost', 1234, socket.AF_INET6),
- ('2001:1000:2000::1', 9092, socket.AF_INET6),
- ('2001:1000:2000::1', 1234, socket.AF_INET6),
- ]))
-
- def test_collect_hosts__string_list(self):
- hosts = [
- 'localhost:1234',
- 'localhost',
- '[localhost]',
- '2001::1',
- '[2001::1]',
- '[2001::1]:1234',
- ]
-
- results = collect_hosts(hosts)
-
- self.assertEqual(set(results), set([
- ('localhost', 1234, socket.AF_UNSPEC),
- ('localhost', 9092, socket.AF_UNSPEC),
- ('localhost', 9092, socket.AF_INET6),
- ('2001::1', 9092, socket.AF_INET6),
- ('2001::1', 9092, socket.AF_INET6),
- ('2001::1', 1234, socket.AF_INET6),
- ]))
-
- def test_collect_hosts__with_spaces(self):
- hosts = "localhost:1234, localhost"
- results = collect_hosts(hosts)
-
- self.assertEqual(set(results), set([
- ('localhost', 1234, socket.AF_UNSPEC),
- ('localhost', 9092, socket.AF_UNSPEC),
- ]))
-
-
def test_send(self):
self.conn.send(self.config['request_id'], self.config['payload'])
self.conn._sock.sendall.assert_called_with(self.config['payload'])
@@ -243,3 +195,16 @@ class TestKafkaConnection(unittest.TestCase):
self.assertEqual(err, [None])
self.assertEqual(socket.call_count, 2)
+
+ def test_timeout(self):
+ def _timeout(*args, **kwargs):
+ timeout = args[1]
+ time.sleep(timeout)
+ raise socket.timeout
+
+ with mock.patch.object(socket, "create_connection", side_effect=_timeout):
+
+ with Timer() as t:
+ with self.assertRaises(ConnectionError):
+ KafkaConnection("nowhere", 1234, 1.0)
+ self.assertGreaterEqual(t.interval, 1.0)