summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-04 18:57:32 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-07 09:49:05 -0700
commitd61e861b0da1647974e617e1633c429c307789be (patch)
treedad1382251212e88666baa20c7ec8d279487fa48 /test
parent931373478e30a9d44b89fad6491136222441e929 (diff)
downloadkafka-python-d61e861b0da1647974e617e1633c429c307789be.tar.gz
Consolidate conn fixture definitions
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py19
-rw-r--r--test/test_client_async.py15
-rw-r--r--test/test_consumer_group.py14
-rw-r--r--test/test_coordinator.py13
4 files changed, 19 insertions, 42 deletions
diff --git a/test/conftest.py b/test/conftest.py
index f3a8947..a389480 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -31,3 +31,22 @@ def kafka_broker(version, zookeeper, request):
k.close()
request.addfinalizer(fin)
return k
+
+
+@pytest.fixture
+def conn(mocker):
+ from kafka.conn import ConnectionStates
+ from kafka.future import Future
+ from kafka.protocol.metadata import MetadataResponse
+ conn = mocker.patch('kafka.client_async.BrokerConnection')
+ conn.return_value = conn
+ conn.state = ConnectionStates.CONNECTED
+ conn.send.return_value = Future().success(
+ MetadataResponse[0](
+ [(0, 'foo', 12), (1, 'bar', 34)], # brokers
+ [])) # topics
+ conn.blacked_out.return_value = False
+ conn.connect.side_effect = lambda: conn.state
+ conn.connecting = lambda: conn.connect() is ConnectionStates.CONNECTING
+ conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
+ return conn
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 2cf348c..c326d55 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -31,21 +31,6 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts):
assert sorted(hosts) == sorted(expected_hosts)
-@pytest.fixture
-def conn(mocker):
- conn = mocker.patch('kafka.client_async.BrokerConnection')
- conn.return_value = conn
- conn.state = ConnectionStates.CONNECTED
- conn.send.return_value = Future().success(
- MetadataResponse[0](
- [(0, 'foo', 12), (1, 'bar', 34)], # brokers
- [])) # topics
- conn.blacked_out.return_value = False
- conn.connect.side_effect = lambda: conn.state
- conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED
- return conn
-
-
def test_bootstrap_success(conn):
conn.state = ConnectionStates.CONNECTED
cli = KafkaClient()
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index fe66d2b..d8a0041 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -9,8 +9,6 @@ import six
from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
-from kafka.future import Future
-from kafka.protocol.metadata import MetadataResponse
from kafka.structs import TopicPartition
from test.conftest import version
@@ -140,18 +138,6 @@ def test_paused(kafka_broker, topic):
assert set() == consumer.paused()
-@pytest.fixture
-def conn(mocker):
- conn = mocker.patch('kafka.client_async.BrokerConnection')
- conn.return_value = conn
- conn.state = ConnectionStates.CONNECTED
- conn.send.return_value = Future().success(
- MetadataResponse[0](
- [(0, 'foo', 12), (1, 'bar', 34)], # brokers
- [])) # topics
- return conn
-
-
def test_heartbeat_timeout(conn, mocker):
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
mocker.patch('time.time', return_value = 1234)
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 629b72f..399609d 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -12,7 +12,6 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
-from kafka.conn import ConnectionStates
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.commit import (
@@ -23,18 +22,6 @@ from kafka.util import WeakMethod
@pytest.fixture
-def conn(mocker):
- conn = mocker.patch('kafka.client_async.BrokerConnection')
- conn.return_value = conn
- conn.state = ConnectionStates.CONNECTED
- conn.send.return_value = Future().success(
- MetadataResponse[0](
- [(0, 'foo', 12), (1, 'bar', 34)], # brokers
- [])) # topics
- return conn
-
-
-@pytest.fixture
def coordinator(conn):
return ConsumerCoordinator(KafkaClient(), SubscriptionState())