summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-09 09:56:38 -0700
committerDana Powers <dana.powers@rd.io>2015-06-09 11:25:17 -0700
commitc75b84eb7a7e81947e4d785dc871fee05e350476 (patch)
treedf973f2ef6a2706d86b7ea7734c6d6f9ce3bb65c /test/test_producer.py
parentfe382a55b253e2c0c4f66052ced1714dbdab65ae (diff)
downloadkafka-python-c75b84eb7a7e81947e4d785dc871fee05e350476.tar.gz
Add Unit test for async producer leader change; return ProduceResponse on success
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py66
1 files changed, 56 insertions, 10 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index c12af02..c7bdfdb 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
-import time
+import collections
import logging
+import time
from mock import MagicMock, patch
from . import unittest
-from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
-from kafka.common import AsyncProducerQueueFull
-from kafka.producer.base import Producer
-from kafka.producer.base import _send_upstream
+from kafka.common import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
+ ProduceResponse, RetryOptions, TopicAndPartition
+)
+from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
import threading
@@ -122,12 +124,21 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [FailedPayloadsError(req) for req in reqs]
- return []
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
self.client.send_produce_request.side_effect = send_side_effect
@@ -136,8 +147,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 5 non-void cals: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # plus 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 5)
def test_with_limited_retries(self):
@@ -157,11 +168,46 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
- # there should be 16 non-void cals:
+ # there should be 16 non-void calls:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
+ # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
self.assertEqual(self.client.send_produce_request.call_count, 16)
+ def test_async_producer_not_leader(self):
+
+ for i in range(10):
+ self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [ProduceResponse(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
+ for req in reqs]
+
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponse(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
def tearDown(self):
for _ in xrange(self.queue.qsize()):
self.queue.get()