From 98c005852e36fde0ef44a7b9c60a54f4686651af Mon Sep 17 00:00:00 2001 From: Jeff Widman Date: Thu, 22 Aug 2019 01:58:28 -0700 Subject: Cleanup handling of KAFKA_VERSION env var in tests (#1887) Now that we are using `pytest`, there is no need for a custom decorator because we can use `pytest.mark.skipif()`. This makes the code significantly simpler. In particular, dropping the custom `@kafka_versions()` decorator is necessary because it uses `func.wraps()` which doesn't play nice with `pytest` fixtures: - https://github.com/pytest-dev/pytest/issues/677 - https://stackoverflow.com/a/19614807/770425 So this is a pre-requisite to migrating some of those tests to using pytest fixtures. --- test/conftest.py | 14 ++----- test/fixtures.py | 25 ++----------- test/test_client_integration.py | 6 ++- test/test_codec.py | 2 +- test/test_consumer_group.py | 18 ++++----- test/test_consumer_integration.py | 42 ++++++++++----------- test/test_failover_integration.py | 4 +- test/test_producer.py | 10 ++--- test/test_producer_integration.py | 8 ++-- test/testutil.py | 78 +++++++-------------------------------- 10 files changed, 65 insertions(+), 142 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index b6d3e3e..5015cc7 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -2,14 +2,8 @@ from __future__ import absolute_import import pytest -from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version - - -@pytest.fixture(scope="module") -def version(): - """Return the Kafka version set in the OS environment""" - return kafka_version() - +from test.testutil import env_kafka_version, random_string +from test.fixtures import KafkaFixture, ZookeeperFixture @pytest.fixture(scope="module") def zookeeper(): @@ -26,9 +20,9 @@ def kafka_broker(kafka_broker_factory): @pytest.fixture(scope="module") -def kafka_broker_factory(version, zookeeper): +def kafka_broker_factory(zookeeper): """Return a Kafka broker fixture factory""" - assert version, 'KAFKA_VERSION must be specified to run integration tests' + assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests' _brokers = [] def factory(**broker_params): diff --git a/test/fixtures.py b/test/fixtures.py index ff6b687..c7748f1 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,9 +4,7 @@ import atexit import logging import os import os.path -import random import socket -import string import subprocess import time import uuid @@ -19,29 +17,12 @@ from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient from kafka.client_async import KafkaClient from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest +from test.testutil import env_kafka_version, random_string from test.service import ExternalService, SpawnedService log = logging.getLogger(__name__) -def random_string(length): - return "".join(random.choice(string.ascii_letters) for i in range(length)) - - -def version_str_to_tuple(version_str): - """Transform a version string into a tuple. - - Example: '0.8.1.1' --> (0, 8, 1, 1) - """ - return tuple(map(int, version_str.split('.'))) - - -def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return version_str_to_tuple(os.environ['KAFKA_VERSION']) - - def get_open_port(): sock = socket.socket() sock.bind(("", 0)) @@ -477,7 +458,7 @@ class KafkaFixture(Fixture): num_partitions == self.partitions and \ replication_factor == self.replicas: self._send_request(MetadataRequest[0]([topic_name])) - elif version() >= (0, 10, 1, 0): + elif env_kafka_version() >= (0, 10, 1, 0): request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) result = self._send_request(request, timeout=timeout_ms) @@ -497,7 +478,7 @@ class KafkaFixture(Fixture): '--replication-factor', self.replicas \ if replication_factor is None \ else replication_factor) - if version() >= (0, 10): + if env_kafka_version() >= (0, 10): args.append('--if-not-exists') env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index df0faef..a983ce1 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -1,5 +1,7 @@ import os +import pytest + from kafka.errors import KafkaTimeoutError from kafka.protocol import create_message from kafka.structs import ( @@ -7,7 +9,7 @@ from kafka.structs import ( ProduceRequestPayload) from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, kafka_versions +from test.testutil import KafkaIntegrationTestCase, env_kafka_version class TestKafkaClientIntegration(KafkaIntegrationTestCase): @@ -80,7 +82,7 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): # Offset Tests # #################### - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_commit_fetch_offsets(self): req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata') (resp,) = self.client.send_offset_commit_request('group', [req]) diff --git a/test/test_codec.py b/test/test_codec.py index 3c4d2df..9eff888 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -14,7 +14,7 @@ from kafka.codec import ( lz4_encode_old_kafka, lz4_decode_old_kafka, ) -from test.fixtures import random_string +from test.testutil import random_string def test_gzip(): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index ecc6d38..3367617 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -11,15 +11,15 @@ from kafka.consumer.group import KafkaConsumer from kafka.coordinator.base import MemberState from kafka.structs import TopicPartition -from test.fixtures import random_string, version +from test.testutil import env_kafka_version, random_string def get_connect_str(kafka_broker): return kafka_broker.host + ':' + str(kafka_broker.port) -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_consumer(kafka_broker, topic, version): +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_consumer(kafka_broker, topic): # The `topic` fixture is included because # 0.8.2 brokers need a topic to function well consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) @@ -29,8 +29,8 @@ def test_consumer(kafka_broker, topic, version): assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED consumer.close() -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") -def test_consumer_topics(kafka_broker, topic, version): +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) # Necessary to drive the IO consumer.poll(500) @@ -38,8 +38,7 @@ def test_consumer_topics(kafka_broker, topic, version): assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() -@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version') -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_group(kafka_broker, topic): num_partitions = 4 connect_str = get_connect_str(kafka_broker) @@ -129,7 +128,7 @@ def test_group(kafka_broker, topic): threads[c] = None -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_paused(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) topics = [TopicPartition(topic, 1)] @@ -148,8 +147,7 @@ def test_paused(kafka_broker, topic): consumer.close() -@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version') -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version') def test_heartbeat_thread(kafka_broker, topic): group_id = 'test-group-' + random_string(6) consumer = KafkaConsumer(topic, diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fdffd05..cb05242 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,19 +1,18 @@ import logging import os import time -from mock import patch -import pytest -import kafka.codec +from mock import patch import pytest -from kafka.vendor.six.moves import range from kafka.vendor import six +from kafka.vendor.six.moves import range from . import unittest from kafka import ( KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message, KafkaProducer ) +import kafka.codec from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, @@ -23,11 +22,11 @@ from kafka.structs import ( ProduceRequestPayload, TopicPartition, OffsetAndTimestamp ) -from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version -from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): """Test KafkaConsumer""" kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') @@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory): kafka_consumer.close() -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer_unsupported_encoding( topic, kafka_producer_factory, kafka_consumer_factory): # Send a compressed message @@ -211,7 +210,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -388,7 +387,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) @@ -459,7 +458,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): big_consumer.stop() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -491,7 +490,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer2.stop() @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_multi_process_offset_behavior__resuming_behavior(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -548,6 +547,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', @@ -586,7 +586,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) consumer.close() - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1") def test_kafka_consumer__offset_commit_resume(self): GROUP_ID = random_string(10) @@ -605,7 +605,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs1 = [] for _ in range(180): m = next(consumer1) - output_msgs1.append(m) + output_msgs1.append((m.key, m.value)) self.assert_message_count(output_msgs1, 180) consumer1.close() @@ -621,12 +621,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): output_msgs2 = [] for _ in range(20): m = next(consumer2) - output_msgs2.append(m) + output_msgs2.append((m.key, m.value)) self.assert_message_count(output_msgs2, 20) self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) consumer2.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_simple(self): self.send_messages(0, range(100, 200)) self.send_messages(1, range(200, 300)) @@ -647,7 +647,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)])) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_max_bytes_one_msg(self): # We send to only 1 partition so we don't have parallel requests to 2 # nodes for data. @@ -673,7 +673,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(fetched_msgs), 10) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_time(self): late_time = int(time.time()) * 1000 middle_time = late_time - 1000 @@ -727,7 +727,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): }) consumer.close() - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_search_many_partitions(self): tp0 = TopicPartition(self.topic, 0) tp1 = TopicPartition(self.topic, 1) @@ -766,7 +766,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): }) consumer.close() - @kafka_versions('<0.10.1') + @pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") def test_kafka_consumer_offsets_for_time_old(self): consumer = self.kafka_consumer() tp = TopicPartition(self.topic, 0) @@ -774,7 +774,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(UnsupportedVersionError): consumer.offsets_for_times({tp: int(time.time())}) - @kafka_versions('>=0.10.1') + @pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1") def test_kafka_consumer_offsets_for_times_errors(self): consumer = self.kafka_consumer(fetch_max_wait_ms=200, request_timeout_ms=500) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 48021a4..ad7dcb9 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -9,8 +9,8 @@ from kafka.errors import ( from kafka.producer.base import Producer from kafka.structs import TopicPartition -from test.fixtures import ZookeeperFixture, KafkaFixture, random_string -from test.testutil import KafkaIntegrationTestCase +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, random_string log = logging.getLogger(__name__) diff --git a/test/test_producer.py b/test/test_producer.py index 60b19bf..9605adf 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ import pytest from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.producer.buffer import SimpleBufferPool -from test.fixtures import random_string, version +from test.testutil import env_kafka_version, random_string def test_buffer_pool(): @@ -22,13 +22,13 @@ def test_buffer_pool(): assert buf2.read() == b'' -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': # LZ4 requires 0.8.2 - if version() < (0, 8, 2): + if env_kafka_version() < (0, 8, 2): return # python-lz4 crashes on older versions of pypy elif platform.python_implementation() == 'PyPy': @@ -80,7 +80,7 @@ def test_kafka_producer_gc_cleanup(): assert threading.active_count() == threads -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) @@ -91,7 +91,7 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): magic = producer._max_usable_produce_magic() # record headers are supported in 0.11.0 - if version() < (0, 11, 0): + if env_kafka_version() < (0, 11, 0): headers = None else: headers = [("Header Key", b"Header Value")] diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 7109886..e0939a6 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -15,8 +15,8 @@ from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError from kafka.producer.base import Producer from kafka.structs import FetchRequestPayload, ProduceRequestPayload -from test.fixtures import ZookeeperFixture, KafkaFixture, version -from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset +from test.fixtures import ZookeeperFixture, KafkaFixture +from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset # TODO: This duplicates a TestKafkaProducerIntegration method temporarily @@ -43,7 +43,7 @@ def assert_produce_response(resp, initial_offset): assert resp[0].offset == initial_offset -@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_produce_many_simple(simple_client, topic): """Test multiple produces using the SimpleClient """ @@ -353,7 +353,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): # KeyedProducer Tests # ############################ - @kafka_versions('>=0.8.1') + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_keyedproducer_null_payload(self): partitions = self.client.get_partition_ids_for_topic(self.topic) start_offsets = [self.current_offset(self.topic, p) for p in partitions] diff --git a/test/testutil.py b/test/testutil.py index 781c364..3272262 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,8 +1,8 @@ from __future__ import absolute_import -import functools -import operator import os +import random +import string import time import uuid @@ -16,72 +16,20 @@ from kafka.errors import ( FailedPayloadsError ) from kafka.structs import OffsetRequestPayload -from test.fixtures import random_string, version_str_to_tuple, version as kafka_version #pylint: disable=wrong-import-order -def kafka_versions(*versions): - """ - Describe the Kafka versions this test is relevant to. - - The versions are passed in as strings, for example: - '0.11.0' - '>=0.10.1.0' - '>0.9', '<1.0' # since this accepts multiple versions args - - The current KAFKA_VERSION will be evaluated against this version. If the - result is False, then the test is skipped. Similarly, if KAFKA_VERSION is - not set the test is skipped. - - Note: For simplicity, this decorator accepts Kafka versions as strings even - though the similarly functioning `api_version` only accepts tuples. Trying - to convert it to tuples quickly gets ugly due to mixing operator strings - alongside version tuples. While doable when one version is passed in, it - isn't pretty when multiple versions are passed in. - """ +def random_string(length): + return "".join(random.choice(string.ascii_letters) for i in range(length)) - def construct_lambda(s): - if s[0].isdigit(): - op_str = '=' - v_str = s - elif s[1].isdigit(): - op_str = s[0] # ! < > = - v_str = s[1:] - elif s[2].isdigit(): - op_str = s[0:2] # >= <= - v_str = s[2:] - else: - raise ValueError('Unrecognized kafka version / operator: %s' % (s,)) - - op_map = { - '=': operator.eq, - '!': operator.ne, - '>': operator.gt, - '<': operator.lt, - '>=': operator.ge, - '<=': operator.le - } - op = op_map[op_str] - version = version_str_to_tuple(v_str) - return lambda a: op(a, version) - - validators = map(construct_lambda, versions) - - def real_kafka_versions(func): - @functools.wraps(func) - def wrapper(func, *args, **kwargs): - version = kafka_version() - - if not version: - pytest.skip("no kafka version set in KAFKA_VERSION env var") - - for f in validators: - if not f(version): - pytest.skip("unsupported kafka version") - - return func(*args, **kwargs) - return wrapper - - return real_kafka_versions + +def env_kafka_version(): + """Return the Kafka version set in the OS environment as a tuple. + + Example: '0.8.1.1' --> (0, 8, 1, 1) + """ + if 'KAFKA_VERSION' not in os.environ: + return () + return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) def current_offset(client, topic, partition, kafka_broker=None): -- cgit v1.2.1