summaryrefslogtreecommitdiff
path: root/test/test_consumer_group.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-08-22 01:58:28 -0700
committerGitHub <noreply@github.com>2019-08-22 01:58:28 -0700
commit98c005852e36fde0ef44a7b9c60a54f4686651af (patch)
tree82f0f218064cd8163addc43042a71207e86cadc9 /test/test_consumer_group.py
parente49caeb3ebdd36eb4d18a517bc402f8e89dfdbee (diff)
downloadkafka-python-98c005852e36fde0ef44a7b9c60a54f4686651af.tar.gz
Cleanup handling of KAFKA_VERSION env var in tests (#1887)
Now that we are using `pytest`, there is no need for a custom decorator because we can use `pytest.mark.skipif()`. This makes the code significantly simpler. In particular, dropping the custom `@kafka_versions()` decorator is necessary because it uses `func.wraps()` which doesn't play nice with `pytest` fixtures: - https://github.com/pytest-dev/pytest/issues/677 - https://stackoverflow.com/a/19614807/770425 So this is a pre-requisite to migrating some of those tests to using pytest fixtures.
Diffstat (limited to 'test/test_consumer_group.py')
-rw-r--r--test/test_consumer_group.py18
1 files changed, 8 insertions, 10 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index ecc6d38..3367617 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -11,15 +11,15 @@ from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition
-from test.fixtures import random_string, version
+from test.testutil import env_kafka_version, random_string
def get_connect_str(kafka_broker):
return kafka_broker.host + ':' + str(kafka_broker.port)
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_consumer(kafka_broker, topic, version):
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_consumer(kafka_broker, topic):
# The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -29,8 +29,8 @@ def test_consumer(kafka_broker, topic, version):
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
consumer.close()
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
-def test_consumer_topics(kafka_broker, topic, version):
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
+def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
@@ -38,8 +38,7 @@ def test_consumer_topics(kafka_broker, topic, version):
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
-@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_group(kafka_broker, topic):
num_partitions = 4
connect_str = get_connect_str(kafka_broker)
@@ -129,7 +128,7 @@ def test_group(kafka_broker, topic):
threads[c] = None
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_paused(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
topics = [TopicPartition(topic, 1)]
@@ -148,8 +147,7 @@ def test_paused(kafka_broker, topic):
consumer.close()
-@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
-@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
def test_heartbeat_thread(kafka_broker, topic):
group_id = 'test-group-' + random_string(6)
consumer = KafkaConsumer(topic,