diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-09 09:56:38 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-09 11:25:17 -0700 |
commit | c75b84eb7a7e81947e4d785dc871fee05e350476 (patch) | |
tree | df973f2ef6a2706d86b7ea7734c6d6f9ce3bb65c /test/test_producer.py | |
parent | fe382a55b253e2c0c4f66052ced1714dbdab65ae (diff) | |
download | kafka-python-c75b84eb7a7e81947e4d785dc871fee05e350476.tar.gz |
Add Unit test for async producer leader change; return ProduceResponse on success
Diffstat (limited to 'test/test_producer.py')
-rw-r--r-- | test/test_producer.py | 66 |
1 files changed, 56 insertions, 10 deletions
diff --git a/test/test_producer.py b/test/test_producer.py index c12af02..c7bdfdb 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- -import time +import collections import logging +import time from mock import MagicMock, patch from . import unittest -from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions -from kafka.common import AsyncProducerQueueFull -from kafka.producer.base import Producer -from kafka.producer.base import _send_upstream +from kafka.common import ( + AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError, + ProduceResponse, RetryOptions, TopicAndPartition +) +from kafka.producer.base import Producer, _send_upstream from kafka.protocol import CODEC_NONE import threading @@ -122,12 +124,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) self.client.is_first_time = True def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False return [FailedPayloadsError(req) for req in reqs] - return [] + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses self.client.send_produce_request.side_effect = send_side_effect @@ -136,8 +147,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 5 non-void cals: 1st failed batch of 3 msgs - # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # plus 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 5) def test_with_limited_retries(self): @@ -157,11 +168,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase): # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) - # there should be 16 non-void cals: + # there should be 16 non-void calls: # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + - # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 self.assertEqual(self.client.send_produce_request.call_count, 16) + def test_async_producer_not_leader(self): + + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + # Mock offsets counter for closure + offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) + self.client.is_first_time = True + def send_side_effect(reqs, *args, **kwargs): + if self.client.is_first_time: + self.client.is_first_time = False + return [ProduceResponse(req.topic, req.partition, + NotLeaderForPartitionError.errno, -1) + for req in reqs] + + responses = [] + for req in reqs: + offset = offsets[req.topic][req.partition] + offsets[req.topic][req.partition] += len(req.messages) + responses.append( + ProduceResponse(req.topic, req.partition, 0, offset) + ) + return responses + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void calls: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.client.send_produce_request.call_count, 5) + def tearDown(self): for _ in xrange(self.queue.qsize()): self.queue.get() |