diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2019-10-11 12:03:22 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-11 12:03:22 -0700 |
commit | 3631bfa009a28767a2057c9beee470acaa6597d5 (patch) | |
tree | e10b73861a33d83a95b6496ef3074ee3caeaae41 /test/test_consumer_integration.py | |
parent | 6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff) | |
download | kafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz |
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing:
* `SimpleClient`
* `SimpleConsumer`
* `SimpleProducer`
* Old partitioners used by `SimpleProducer`; these are superceded by
the `DefaultPartitioner`
These have been deprecated for several years in favor of `KafkaClient`
/ `KafkaConsumer` / `KafkaProducer`.
Since 2.0 allows breaking changes, we are removing the deprecated
classes.
Additionally, since the only usage of `unittest` was in tests for these
old Simple* clients, this also drops `unittest` from the library. All
tests now run under `pytest`.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 498 |
1 files changed, 3 insertions, 495 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d6fd41c..6e6bc94 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,29 +1,17 @@ import logging -import os import time from mock import patch import pytest from kafka.vendor.six.moves import range -from . import unittest -from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, - create_gzip_message, KafkaProducer -) import kafka.codec -from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( - ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, - KafkaTimeoutError, UnsupportedCodecError -) -from kafka.protocol.message import PartialMessage -from kafka.structs import ( - ProduceRequestPayload, TopicPartition, OffsetAndTimestamp + KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError ) +from kafka.structs import TopicPartition, OffsetAndTimestamp -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string +from test.testutil import Timer, assert_message_count, env_kafka_version, random_string @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @@ -63,486 +51,6 @@ def test_kafka_consumer_unsupported_encoding( consumer.poll(timeout_ms=2000) -class TestConsumerIntegration(KafkaIntegrationTestCase): - maxDiff = None - - @classmethod - def setUpClass(cls): - if not os.environ.get('KAFKA_VERSION'): - return - - cls.zk = ZookeeperFixture.instance() - chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk, - zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk, - zk_chroot=chroot) - - cls.server = cls.server1 # Bootstrapping server - - @classmethod - def tearDownClass(cls): - if not os.environ.get('KAFKA_VERSION'): - return - - cls.server1.close() - cls.server2.close() - cls.zk.close() - - def send_messages(self, partition, messages): - messages = [ create_message(self.msg(str(msg))) for msg in messages ] - produce = ProduceRequestPayload(self.topic, partition, messages = messages) - resp, = self.client.send_produce_request([produce]) - self.assertEqual(resp.error, 0) - - return [ x.value for x in messages ] - - def send_gzip_message(self, partition, messages): - message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages]) - produce = ProduceRequestPayload(self.topic, partition, messages = [message]) - resp, = self.client.send_produce_request([produce]) - self.assertEqual(resp.error, 0) - - def assert_message_count(self, messages, num_messages): - # Make sure we got them all - self.assertEqual(len(messages), num_messages) - - # Make sure there are no duplicates - self.assertEqual(len(set(messages)), num_messages) - - def consumer(self, **kwargs): - if os.environ['KAFKA_VERSION'] == "0.8.0": - # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off - kwargs['group'] = None - kwargs['auto_commit'] = False - else: - kwargs.setdefault('group', None) - kwargs.setdefault('auto_commit', False) - - consumer_class = kwargs.pop('consumer', SimpleConsumer) - group = kwargs.pop('group', None) - topic = kwargs.pop('topic', self.topic) - - if consumer_class in [SimpleConsumer, MultiProcessConsumer]: - kwargs.setdefault('iter_timeout', 0) - - return consumer_class(self.client, group, topic, **kwargs) - - def kafka_consumer(self, **configs): - brokers = '%s:%d' % (self.server.host, self.server.port) - consumer = KafkaConsumer(self.topic, - bootstrap_servers=brokers, - **configs) - return consumer - - def kafka_producer(self, **configs): - brokers = '%s:%d' % (self.server.host, self.server.port) - producer = KafkaProducer( - bootstrap_servers=brokers, **configs) - return producer - - def test_simple_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.consumer() - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - def test_simple_consumer_gzip(self): - self.send_gzip_message(0, range(0, 100)) - self.send_gzip_message(1, range(100, 200)) - - # Start a consumer - consumer = self.consumer() - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - def test_simple_consumer_smallest_offset_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer(auto_offset_reset='smallest') - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - # Since auto_offset_reset is set to smallest we should read all 200 - # messages from beginning. - self.assert_message_count([message for message in consumer], 200) - - def test_simple_consumer_largest_offset_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Default largest - consumer = self.consumer() - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - # Since auto_offset_reset is set to largest we should not read any - # messages. - self.assert_message_count([message for message in consumer], 0) - # Send 200 new messages to the queue - self.send_messages(0, range(200, 300)) - self.send_messages(1, range(300, 400)) - # Since the offset is set to largest we should read all the new messages. - self.assert_message_count([message for message in consumer], 200) - - def test_simple_consumer_no_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Default largest - consumer = self.consumer(auto_offset_reset=None) - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - with self.assertRaises(OffsetOutOfRangeError): - consumer.get_message() - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_simple_consumer_load_initial_offsets(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Create 1st consumer and change offsets - consumer = self.consumer(group='test_simple_consumer_load_initial_offsets') - self.assertEqual(consumer.offsets, {0: 0, 1: 0}) - consumer.offsets.update({0:51, 1:101}) - # Update counter after manual offsets update - consumer.count_since_commit += 1 - consumer.commit() - - # Create 2nd consumer and check initial offsets - consumer = self.consumer(group='test_simple_consumer_load_initial_offsets', - auto_commit=False) - self.assertEqual(consumer.offsets, {0: 51, 1: 101}) - - def test_simple_consumer__seek(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer() - - # Rewind 10 messages from the end - consumer.seek(-10, 2) - self.assert_message_count([ message for message in consumer ], 10) - - # Rewind 13 messages from the end - consumer.seek(-13, 2) - self.assert_message_count([ message for message in consumer ], 13) - - # Set absolute offset - consumer.seek(100) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(100, partition=0) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(101, partition=1) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(90, partition=0) - self.assert_message_count([ message for message in consumer ], 10) - consumer.seek(20, partition=1) - self.assert_message_count([ message for message in consumer ], 80) - consumer.seek(0, partition=1) - self.assert_message_count([ message for message in consumer ], 100) - - consumer.stop() - - @pytest.mark.skipif(env_kafka_version() >= (2, 0), - reason="SimpleConsumer blocking does not handle PartialMessage change in kafka 2.0+") - def test_simple_consumer_blocking(self): - consumer = self.consumer() - - # Ask for 5 messages, nothing in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(block=True, timeout=1) - self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 1) - - self.send_messages(0, range(0, 5)) - self.send_messages(1, range(5, 10)) - - # Ask for 5 messages, 10 in queue. Get 5 back, no blocking - with Timer() as t: - messages = consumer.get_messages(count=5, block=True, timeout=3) - self.assert_message_count(messages, 5) - self.assertLess(t.interval, 3) - - # Ask for 10 messages, get 5 back, block 1 second - with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=1) - self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 - # second, get 5 back, no blocking - self.send_messages(0, range(0, 3)) - self.send_messages(1, range(3, 5)) - with Timer() as t: - messages = consumer.get_messages(count=10, block=1, timeout=1) - self.assert_message_count(messages, 5) - self.assertLessEqual(t.interval, 1) - - consumer.stop() - - def test_simple_consumer_pending(self): - # make sure that we start with no pending messages - consumer = self.consumer() - self.assertEquals(consumer.pending(), 0) - self.assertEquals(consumer.pending(partitions=[0]), 0) - self.assertEquals(consumer.pending(partitions=[1]), 0) - - # Produce 10 messages to partitions 0 and 1 - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - consumer = self.consumer() - - self.assertEqual(consumer.pending(), 20) - self.assertEqual(consumer.pending(partitions=[0]), 10) - self.assertEqual(consumer.pending(partitions=[1]), 10) - - # move to last message, so one partition should have 1 pending - # message and other 0 - consumer.seek(-1, 2) - self.assertEqual(consumer.pending(), 1) - - pending_part1 = consumer.pending(partitions=[0]) - pending_part2 = consumer.pending(partitions=[1]) - self.assertEquals(set([0, 1]), set([pending_part1, pending_part2])) - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_process_consumer(self): - # Produce 100 messages to partitions 0 and 1 - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer(consumer = MultiProcessConsumer) - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_process_consumer_blocking(self): - consumer = self.consumer(consumer = MultiProcessConsumer) - - # Ask for 5 messages, No messages in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(block=True, timeout=1) - self.assert_message_count(messages, 0) - - self.assertGreaterEqual(t.interval, 1) - - # Send 10 messages - self.send_messages(0, range(0, 10)) - - # Ask for 5 messages, 10 messages in queue, block 0 seconds - with Timer() as t: - messages = consumer.get_messages(count=5, block=True, timeout=5) - self.assert_message_count(messages, 5) - self.assertLessEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=1) - self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 - # second, get at least one back, no blocking - self.send_messages(0, range(0, 5)) - with Timer() as t: - messages = consumer.get_messages(count=10, block=1, timeout=1) - received_message_count = len(messages) - self.assertGreaterEqual(received_message_count, 1) - self.assert_message_count(messages, received_message_count) - self.assertLessEqual(t.interval, 1) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_proc_pending(self): - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - # set group to None and auto_commit to False to avoid interactions w/ - # offset commit/fetch apis - consumer = MultiProcessConsumer(self.client, None, self.topic, - auto_commit=False, iter_timeout=0) - - self.assertEqual(consumer.pending(), 20) - self.assertEqual(consumer.pending(partitions=[0]), 10) - self.assertEqual(consumer.pending(partitions=[1]), 10) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_multi_process_consumer_load_initial_offsets(self): - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - # Create 1st consumer and change offsets - consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets') - self.assertEqual(consumer.offsets, {0: 0, 1: 0}) - consumer.offsets.update({0:5, 1:15}) - # Update counter after manual offsets update - consumer.count_since_commit += 1 - consumer.commit() - - # Create 2nd consumer and check initial offsets - consumer = self.consumer(consumer = MultiProcessConsumer, - group='test_multi_process_consumer_load_initial_offsets', - auto_commit=False) - self.assertEqual(consumer.offsets, {0: 5, 1: 15}) - - def test_large_messages(self): - # Produce 10 "normal" size messages - small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) - - # Produce 10 messages that are large (bigger than default fetch size) - large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ]) - - # Brokers prior to 0.11 will return the next message - # if it is smaller than max_bytes (called buffer_size in SimpleConsumer) - # Brokers 0.11 and later that store messages in v2 format - # internally will return the next message only if the - # full MessageSet is smaller than max_bytes. - # For that reason, we set the max buffer size to a little more - # than the size of all large messages combined - consumer = self.consumer(max_buffer_size=60000) - - expected_messages = set(small_messages + large_messages) - actual_messages = set([x.message.value for x in consumer - if not isinstance(x.message, PartialMessage)]) - self.assertEqual(expected_messages, actual_messages) - - consumer.stop() - - def test_huge_messages(self): - huge_message, = self.send_messages(0, [ - create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), - ]) - - # Create a consumer with the default buffer size - consumer = self.consumer() - - # This consumer fails to get the message - with self.assertRaises(ConsumerFetchSizeTooSmall): - consumer.get_message(False, 0.1) - - consumer.stop() - - # Create a consumer with no fetch size limit - big_consumer = self.consumer( - max_buffer_size = None, - partitions = [0], - ) - - # Seek to the last message - big_consumer.seek(-1, 2) - - # Consume giant message successfully - message = big_consumer.get_message(block=False, timeout=10) - self.assertIsNotNone(message) - self.assertEqual(message.message.value, huge_message) - - big_consumer.stop() - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_offset_behavior__resuming_behavior(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.consumer( - group='test_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # Grab the first 195 messages - output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ] - self.assert_message_count(output_msgs1, 195) - - # The total offset across both partitions should be at 180 - consumer2 = self.consumer( - group='test_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # 181-200 - self.assert_message_count([ message for message in consumer2 ], 20) - - consumer1.stop() - consumer2.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_multi_process_offset_behavior__resuming_behavior(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.consumer( - consumer=MultiProcessConsumer, - group='test_multi_process_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # Grab the first 195 messages - output_msgs1 = [] - idx = 0 - for message in consumer1: - output_msgs1.append(message.message.value) - idx += 1 - if idx >= 195: - break - self.assert_message_count(output_msgs1, 195) - - # The total offset across both partitions should be at 180 - consumer2 = self.consumer( - consumer=MultiProcessConsumer, - group='test_multi_process_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # 181-200 - self.assert_message_count([ message for message in consumer2 ], 20) - - consumer1.stop() - consumer2.stop() - - # TODO: Make this a unit test -- should not require integration - def test_fetch_buffer_size(self): - - # Test parameters (see issue 135 / PR 136) - TEST_MESSAGE_SIZE=1048 - INIT_BUFFER_SIZE=1024 - MAX_BUFFER_SIZE=2048 - assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE - assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE - assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE - - self.send_messages(0, [ "x" * 1048 ]) - self.send_messages(1, [ "x" * 1048 ]) - - consumer = self.consumer(buffer_size=1024, max_buffer_size=2048) - messages = [ message for message in consumer ] - self.assertEqual(len(messages), 2) - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): TIMEOUT_MS = 500 |