summaryrefslogtreecommitdiff
path: root/test/test_consumer_integration.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-10-11 12:03:22 -0700
committerGitHub <noreply@github.com>2019-10-11 12:03:22 -0700
commit3631bfa009a28767a2057c9beee470acaa6597d5 (patch)
treee10b73861a33d83a95b6496ef3074ee3caeaae41 /test/test_consumer_integration.py
parent6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff)
downloadkafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing: * `SimpleClient` * `SimpleConsumer` * `SimpleProducer` * Old partitioners used by `SimpleProducer`; these are superceded by the `DefaultPartitioner` These have been deprecated for several years in favor of `KafkaClient` / `KafkaConsumer` / `KafkaProducer`. Since 2.0 allows breaking changes, we are removing the deprecated classes. Additionally, since the only usage of `unittest` was in tests for these old Simple* clients, this also drops `unittest` from the library. All tests now run under `pytest`.
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r--test/test_consumer_integration.py498
1 files changed, 3 insertions, 495 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index d6fd41c..6e6bc94 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,29 +1,17 @@
import logging
-import os
import time
from mock import patch
import pytest
from kafka.vendor.six.moves import range
-from . import unittest
-from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
- create_gzip_message, KafkaProducer
-)
import kafka.codec
-from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
- ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError, UnsupportedCodecError
-)
-from kafka.protocol.message import PartialMessage
-from kafka.structs import (
- ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+ KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
)
+from kafka.structs import TopicPartition, OffsetAndTimestamp
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string
+from test.testutil import Timer, assert_message_count, env_kafka_version, random_string
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@@ -63,486 +51,6 @@ def test_kafka_consumer_unsupported_encoding(
consumer.poll(timeout_ms=2000)
-class TestConsumerIntegration(KafkaIntegrationTestCase):
- maxDiff = None
-
- @classmethod
- def setUpClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk,
- zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk,
- zk_chroot=chroot)
-
- cls.server = cls.server1 # Bootstrapping server
-
- @classmethod
- def tearDownClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server1.close()
- cls.server2.close()
- cls.zk.close()
-
- def send_messages(self, partition, messages):
- messages = [ create_message(self.msg(str(msg))) for msg in messages ]
- produce = ProduceRequestPayload(self.topic, partition, messages = messages)
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- return [ x.value for x in messages ]
-
- def send_gzip_message(self, partition, messages):
- message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
- produce = ProduceRequestPayload(self.topic, partition, messages = [message])
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- def assert_message_count(self, messages, num_messages):
- # Make sure we got them all
- self.assertEqual(len(messages), num_messages)
-
- # Make sure there are no duplicates
- self.assertEqual(len(set(messages)), num_messages)
-
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['group'] = None
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('group', None)
- kwargs.setdefault('auto_commit', False)
-
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', None)
- topic = kwargs.pop('topic', self.topic)
-
- if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
- kwargs.setdefault('iter_timeout', 0)
-
- return consumer_class(self.client, group, topic, **kwargs)
-
- def kafka_consumer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- consumer = KafkaConsumer(self.topic,
- bootstrap_servers=brokers,
- **configs)
- return consumer
-
- def kafka_producer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- producer = KafkaProducer(
- bootstrap_servers=brokers, **configs)
- return producer
-
- def test_simple_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_gzip(self):
- self.send_gzip_message(0, range(0, 100))
- self.send_gzip_message(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_smallest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(auto_offset_reset='smallest')
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to smallest we should read all 200
- # messages from beginning.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_largest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer()
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to largest we should not read any
- # messages.
- self.assert_message_count([message for message in consumer], 0)
- # Send 200 new messages to the queue
- self.send_messages(0, range(200, 300))
- self.send_messages(1, range(300, 400))
- # Since the offset is set to largest we should read all the new messages.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_no_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer(auto_offset_reset=None)
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- with self.assertRaises(OffsetOutOfRangeError):
- consumer.get_message()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_simple_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:51, 1:101})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 51, 1: 101})
-
- def test_simple_consumer__seek(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer()
-
- # Rewind 10 messages from the end
- consumer.seek(-10, 2)
- self.assert_message_count([ message for message in consumer ], 10)
-
- # Rewind 13 messages from the end
- consumer.seek(-13, 2)
- self.assert_message_count([ message for message in consumer ], 13)
-
- # Set absolute offset
- consumer.seek(100)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(100, partition=0)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(101, partition=1)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(90, partition=0)
- self.assert_message_count([ message for message in consumer ], 10)
- consumer.seek(20, partition=1)
- self.assert_message_count([ message for message in consumer ], 80)
- consumer.seek(0, partition=1)
- self.assert_message_count([ message for message in consumer ], 100)
-
- consumer.stop()
-
- @pytest.mark.skipif(env_kafka_version() >= (2, 0),
- reason="SimpleConsumer blocking does not handle PartialMessage change in kafka 2.0+")
- def test_simple_consumer_blocking(self):
- consumer = self.consumer()
-
- # Ask for 5 messages, nothing in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 1)
-
- self.send_messages(0, range(0, 5))
- self.send_messages(1, range(5, 10))
-
- # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=3)
- self.assert_message_count(messages, 5)
- self.assertLess(t.interval, 3)
-
- # Ask for 10 messages, get 5 back, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get 5 back, no blocking
- self.send_messages(0, range(0, 3))
- self.send_messages(1, range(3, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- def test_simple_consumer_pending(self):
- # make sure that we start with no pending messages
- consumer = self.consumer()
- self.assertEquals(consumer.pending(), 0)
- self.assertEquals(consumer.pending(partitions=[0]), 0)
- self.assertEquals(consumer.pending(partitions=[1]), 0)
-
- # Produce 10 messages to partitions 0 and 1
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- consumer = self.consumer()
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- # move to last message, so one partition should have 1 pending
- # message and other 0
- consumer.seek(-1, 2)
- self.assertEqual(consumer.pending(), 1)
-
- pending_part1 = consumer.pending(partitions=[0])
- pending_part2 = consumer.pending(partitions=[1])
- self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer(self):
- # Produce 100 messages to partitions 0 and 1
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer_blocking(self):
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- # Ask for 5 messages, No messages in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
-
- self.assertGreaterEqual(t.interval, 1)
-
- # Send 10 messages
- self.send_messages(0, range(0, 10))
-
- # Ask for 5 messages, 10 messages in queue, block 0 seconds
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=5)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get at least one back, no blocking
- self.send_messages(0, range(0, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- received_message_count = len(messages)
- self.assertGreaterEqual(received_message_count, 1)
- self.assert_message_count(messages, received_message_count)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_proc_pending(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # set group to None and auto_commit to False to avoid interactions w/
- # offset commit/fetch apis
- consumer = MultiProcessConsumer(self.client, None, self.topic,
- auto_commit=False, iter_timeout=0)
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:5, 1:15})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(consumer = MultiProcessConsumer,
- group='test_multi_process_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 5, 1: 15})
-
- def test_large_messages(self):
- # Produce 10 "normal" size messages
- small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
-
- # Produce 10 messages that are large (bigger than default fetch size)
- large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
-
- # Brokers prior to 0.11 will return the next message
- # if it is smaller than max_bytes (called buffer_size in SimpleConsumer)
- # Brokers 0.11 and later that store messages in v2 format
- # internally will return the next message only if the
- # full MessageSet is smaller than max_bytes.
- # For that reason, we set the max buffer size to a little more
- # than the size of all large messages combined
- consumer = self.consumer(max_buffer_size=60000)
-
- expected_messages = set(small_messages + large_messages)
- actual_messages = set([x.message.value for x in consumer
- if not isinstance(x.message, PartialMessage)])
- self.assertEqual(expected_messages, actual_messages)
-
- consumer.stop()
-
- def test_huge_messages(self):
- huge_message, = self.send_messages(0, [
- create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
- ])
-
- # Create a consumer with the default buffer size
- consumer = self.consumer()
-
- # This consumer fails to get the message
- with self.assertRaises(ConsumerFetchSizeTooSmall):
- consumer.get_message(False, 0.1)
-
- consumer.stop()
-
- # Create a consumer with no fetch size limit
- big_consumer = self.consumer(
- max_buffer_size = None,
- partitions = [0],
- )
-
- # Seek to the last message
- big_consumer.seek(-1, 2)
-
- # Consume giant message successfully
- message = big_consumer.get_message(block=False, timeout=10)
- self.assertIsNotNone(message)
- self.assertEqual(message.message.value, huge_message)
-
- big_consumer.stop()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ]
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = []
- idx = 0
- for message in consumer1:
- output_msgs1.append(message.message.value)
- idx += 1
- if idx >= 195:
- break
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- # TODO: Make this a unit test -- should not require integration
- def test_fetch_buffer_size(self):
-
- # Test parameters (see issue 135 / PR 136)
- TEST_MESSAGE_SIZE=1048
- INIT_BUFFER_SIZE=1024
- MAX_BUFFER_SIZE=2048
- assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
- assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
- assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
-
- self.send_messages(0, [ "x" * 1048 ])
- self.send_messages(1, [ "x" * 1048 ])
-
- consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
- messages = [ message for message in consumer ]
- self.assertEqual(len(messages), 2)
-
-
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages):
TIMEOUT_MS = 500