diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/conftest.py | 19 | ||||
-rw-r--r-- | test/test_client_async.py | 15 | ||||
-rw-r--r-- | test/test_consumer_group.py | 14 | ||||
-rw-r--r-- | test/test_coordinator.py | 13 |
4 files changed, 19 insertions, 42 deletions
diff --git a/test/conftest.py b/test/conftest.py index f3a8947..a389480 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -31,3 +31,22 @@ def kafka_broker(version, zookeeper, request): k.close() request.addfinalizer(fin) return k + + +@pytest.fixture +def conn(mocker): + from kafka.conn import ConnectionStates + from kafka.future import Future + from kafka.protocol.metadata import MetadataResponse + conn = mocker.patch('kafka.client_async.BrokerConnection') + conn.return_value = conn + conn.state = ConnectionStates.CONNECTED + conn.send.return_value = Future().success( + MetadataResponse[0]( + [(0, 'foo', 12), (1, 'bar', 34)], # brokers + [])) # topics + conn.blacked_out.return_value = False + conn.connect.side_effect = lambda: conn.state + conn.connecting = lambda: conn.connect() is ConnectionStates.CONNECTING + conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED + return conn diff --git a/test/test_client_async.py b/test/test_client_async.py index 2cf348c..c326d55 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -31,21 +31,6 @@ def test_bootstrap_servers(mocker, bootstrap, expected_hosts): assert sorted(hosts) == sorted(expected_hosts) -@pytest.fixture -def conn(mocker): - conn = mocker.patch('kafka.client_async.BrokerConnection') - conn.return_value = conn - conn.state = ConnectionStates.CONNECTED - conn.send.return_value = Future().success( - MetadataResponse[0]( - [(0, 'foo', 12), (1, 'bar', 34)], # brokers - [])) # topics - conn.blacked_out.return_value = False - conn.connect.side_effect = lambda: conn.state - conn.connected = lambda: conn.connect() is ConnectionStates.CONNECTED - return conn - - def test_bootstrap_success(conn): conn.state = ConnectionStates.CONNECTED cli = KafkaClient() diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index fe66d2b..d8a0041 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -9,8 +9,6 @@ import six from kafka import SimpleClient from kafka.conn import ConnectionStates from kafka.consumer.group import KafkaConsumer -from kafka.future import Future -from kafka.protocol.metadata import MetadataResponse from kafka.structs import TopicPartition from test.conftest import version @@ -140,18 +138,6 @@ def test_paused(kafka_broker, topic): assert set() == consumer.paused() -@pytest.fixture -def conn(mocker): - conn = mocker.patch('kafka.client_async.BrokerConnection') - conn.return_value = conn - conn.state = ConnectionStates.CONNECTED - conn.send.return_value = Future().success( - MetadataResponse[0]( - [(0, 'foo', 12), (1, 'bar', 34)], # brokers - [])) # topics - return conn - - def test_heartbeat_timeout(conn, mocker): mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9') mocker.patch('time.time', return_value = 1234) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 629b72f..399609d 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -12,7 +12,6 @@ from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) -from kafka.conn import ConnectionStates import kafka.errors as Errors from kafka.future import Future from kafka.protocol.commit import ( @@ -23,18 +22,6 @@ from kafka.util import WeakMethod @pytest.fixture -def conn(mocker): - conn = mocker.patch('kafka.client_async.BrokerConnection') - conn.return_value = conn - conn.state = ConnectionStates.CONNECTED - conn.send.return_value = Future().success( - MetadataResponse[0]( - [(0, 'foo', 12), (1, 'bar', 34)], # brokers - [])) # topics - return conn - - -@pytest.fixture def coordinator(conn): return ConsumerCoordinator(KafkaClient(), SubscriptionState()) |