From 2b6d063085dc6bab9e84cc5c714be5cf2716fe38 Mon Sep 17 00:00:00 2001 From: Zack Dever Date: Wed, 16 Mar 2016 16:21:37 -0700 Subject: KAFKA-2698: add paused API --- test/test_consumer_group.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) (limited to 'test/test_consumer_group.py') diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 34b1be4..5fcfbe2 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -17,10 +17,13 @@ from test.conftest import version from test.testutil import random_string +def get_connect_str(kafka_broker): + return 'localhost:' + str(kafka_broker.port) + + @pytest.fixture def simple_client(kafka_broker): - connect_str = 'localhost:' + str(kafka_broker.port) - return SimpleClient(connect_str) + return SimpleClient(get_connect_str(kafka_broker)) @pytest.fixture @@ -37,8 +40,7 @@ def test_consumer(kafka_broker, version): if version >= (0, 8, 2) and version < (0, 9): topic(simple_client(kafka_broker)) - connect_str = 'localhost:' + str(kafka_broker.port) - consumer = KafkaConsumer(bootstrap_servers=connect_str) + consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) consumer.poll(500) assert len(consumer._client._conns) > 0 node_id = list(consumer._client._conns.keys())[0] @@ -49,7 +51,7 @@ def test_consumer(kafka_broker, version): @pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") def test_group(kafka_broker, topic): num_partitions = 4 - connect_str = 'localhost:' + str(kafka_broker.port) + connect_str = get_connect_str(kafka_broker) consumers = {} stop = {} threads = {} @@ -120,6 +122,24 @@ def test_group(kafka_broker, topic): threads[c].join() +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_paused(kafka_broker, topic): + consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) + topics = [TopicPartition(topic, 1)] + consumer.assign(topics) + assert set(topics) == consumer.assignment() + assert set() == consumer.paused() + + consumer.pause(topics[0]) + assert set([topics[0]]) == consumer.paused() + + consumer.resume(topics[0]) + assert set() == consumer.paused() + + consumer.unsubscribe() + assert set() == consumer.paused() + + @pytest.fixture def conn(mocker): conn = mocker.patch('kafka.client_async.BrokerConnection') -- cgit v1.2.1