summaryrefslogtreecommitdiff
path: root/test/test_producer_legacy.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_legacy.py')
-rw-r--r--test/test_producer_legacy.py257
1 files changed, 0 insertions, 257 deletions
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
deleted file mode 100644
index ab80ee7..0000000
--- a/test/test_producer_legacy.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import collections
-import logging
-import threading
-import time
-
-from mock import MagicMock, patch
-from . import unittest
-
-from kafka import SimpleClient, SimpleProducer, KeyedProducer
-from kafka.errors import (
- AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError)
-from kafka.producer.base import Producer, _send_upstream
-from kafka.protocol import CODEC_NONE
-from kafka.structs import (
- ProduceResponsePayload, RetryOptions, TopicPartition)
-
-from kafka.vendor.six.moves import queue, range
-
-
-class TestKafkaProducer(unittest.TestCase):
- def test_producer_message_types(self):
-
- producer = Producer(MagicMock())
- topic = b"test-topic"
- partition = 0
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'}, None,)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, partition, m)
-
- good_data_types = (b'a string!',)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, partition, m)
-
- def test_keyedproducer_message_types(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
- producer = KeyedProducer(client)
- topic = b"test-topic"
- key = b"testkey"
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'},)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, key, m)
-
- good_data_types = (b'a string!', None,)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, key, m)
-
- def test_topic_message_types(self):
- client = MagicMock()
-
- def partitions(topic):
- return [0, 1]
-
- client.get_partition_ids_for_topic = partitions
-
- producer = SimpleProducer(client, random_start=False)
- topic = b"test-topic"
- producer.send_messages(topic, b'hi')
- assert client.send_produce_request.called
-
- @patch('kafka.producer.base._send_upstream')
- def test_producer_async_queue_overfilled(self, mock):
- queue_size = 2
- producer = Producer(MagicMock(), async_send=True,
- async_queue_maxsize=queue_size)
-
- topic = b'test-topic'
- partition = 0
- message = b'test-message'
-
- with self.assertRaises(AsyncProducerQueueFull):
- message_list = [message] * (queue_size + 1)
- producer.send_messages(topic, partition, *message_list)
- self.assertEqual(producer.queue.qsize(), queue_size)
- for _ in range(producer.queue.qsize()):
- producer.queue.get()
-
- def test_producer_sync_fail_on_error(self):
- error = FailedPayloadsError('failure')
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, 'ensure_topic_exists'):
- with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
-
- client = SimpleClient(MagicMock())
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
-
- # This should not raise
- (response,) = producer.send_messages('foobar', b'test message')
- self.assertEqual(response, error)
-
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages('foobar', b'test message')
-
- def test_cleanup_is_not_called_on_stopped_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = True
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 0)
-
- def test_cleanup_is_called_on_running_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = False
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 1)
-
-
-class TestKafkaProducerSendUpstream(unittest.TestCase):
-
- def setUp(self):
- self.client = MagicMock()
- self.queue = queue.Queue()
-
- def _run_process(self, retries_limit=3, sleep_timeout=1):
- # run _send_upstream process with the queue
- stop_event = threading.Event()
- retry_options = RetryOptions(limit=retries_limit,
- backoff_ms=50,
- retry_on_timeouts=False)
- self.thread = threading.Thread(
- target=_send_upstream,
- args=(self.queue, self.client, CODEC_NONE,
- 0.3, # batch time (seconds)
- 3, # batch length
- Producer.ACK_AFTER_LOCAL_WRITE,
- Producer.DEFAULT_ACK_TIMEOUT,
- retry_options,
- stop_event))
- self.thread.daemon = True
- self.thread.start()
- time.sleep(sleep_timeout)
- stop_event.set()
-
- def test_wo_retries(self):
-
- # lets create a queue and add 10 messages for 1 partition
- for i in range(10):
- self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
-
- self._run_process()
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 4 non-void cals:
- # 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 4)
-
- def test_first_send_failed(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [FailedPayloadsError(req) for req in reqs]
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # plus 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def test_with_limited_retries(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
-
- def send_side_effect(reqs, *args, **kwargs):
- return [FailedPayloadsError(req) for req in reqs]
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(3, 3)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 16 non-void calls:
- # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
- self.assertEqual(self.client.send_produce_request.call_count, 16)
-
- def test_async_producer_not_leader(self):
-
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [ProduceResponsePayload(req.topic, req.partition,
- NotLeaderForPartitionError.errno, -1)
- for req in reqs]
-
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def tearDown(self):
- for _ in range(self.queue.qsize()):
- self.queue.get()