diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-05-15 12:58:34 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:49 +0300 |
commit | a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch) | |
tree | 0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /test/test_producer.py | |
parent | 4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff) | |
download | kafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz |
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only
- using an internal dict with with namedtuple keys for retry counters
- refresh metadata on refresh_error irrespective to retries options
- removed infinite retries (retry_options.limit=None) as an over-feature
- separate producer init args for retries options (limit,backoff,on_timeouts)
- AsyncProducerQueueFull returns a list of failed messages
- producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'test/test_producer.py')
-rw-r--r-- | test/test_producer.py | 53 |
1 files changed, 20 insertions, 33 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index 3004c2d..a2ba877 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -17,6 +17,10 @@ try: from queue import Empty, Queue except ImportError: from Queue import Empty, Queue +try: + xrange +except NameError: + xrange = range class TestKafkaProducer(unittest.TestCase): @@ -52,7 +56,8 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - def test_producer_async_queue_overfilled_batch_send(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) @@ -64,8 +69,12 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() - def test_producer_async_queue_overfilled(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): queue_size = 2 producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) @@ -77,7 +86,9 @@ class TestKafkaProducer(unittest.TestCase): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) - + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() class TestKafkaProducerSendUpstream(unittest.TestCase): @@ -121,7 +132,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 4) - def test_first_send_failed(self): # lets create a queue and add 10 messages for 10 different partitions @@ -133,7 +143,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] + return [] self.client.send_produce_request.side_effect = send_side_effect @@ -154,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] self.client.send_produce_request.side_effect = send_side_effect @@ -168,30 +179,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed self.assertEqual(self.client.send_produce_request.call_count, 16) - def test_with_unlimited_retries(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - - def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(None) - - # the queue should have 7 elements - # 3 batches of 1 msg each were retried all this time - self.assertEqual(self.queue.empty(), False) - try: - for i in range(7): - self.queue.get(timeout=0.01) - except Empty: - self.fail("Should be 7 elems in the queue") - self.assertEqual(self.queue.empty(), True) - - # 1s / 50ms of backoff = 20 times max - calls = self.client.send_produce_request.call_count - self.assertTrue(calls > 10 & calls <= 20) + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() |