summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-05-15 12:58:34 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:49 +0300
commita3fb3225a27ba6ca1a9fdac519c1f4257754d4eb (patch)
tree0d2746735852a9e3b1f6cebfbd2e33e237ff2aea /test/test_producer.py
parent4c682f3d4da6c5af8bfbb00700c431a272b37dc1 (diff)
downloadkafka-python-a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb.tar.gz
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py53
1 files changed, 20 insertions, 33 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 3004c2d..a2ba877 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -17,6 +17,10 @@ try:
from queue import Empty, Queue
except ImportError:
from Queue import Empty, Queue
+try:
+ xrange
+except NameError:
+ xrange = range
class TestKafkaProducer(unittest.TestCase):
@@ -52,7 +56,8 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
- def test_producer_async_queue_overfilled_batch_send(self):
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled_batch_send(self, mock):
queue_size = 2
producer = Producer(MagicMock(), batch_send=True,
async_queue_maxsize=queue_size)
@@ -64,8 +69,12 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
- def test_producer_async_queue_overfilled(self):
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
producer = Producer(MagicMock(), async=True,
async_queue_maxsize=queue_size)
@@ -77,7 +86,9 @@ class TestKafkaProducer(unittest.TestCase):
with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
-
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
class TestKafkaProducerSendUpstream(unittest.TestCase):
@@ -121,7 +132,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 4)
-
def test_first_send_failed(self):
# lets create a queue and add 10 messages for 10 different partitions
@@ -133,7 +143,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
- raise FailedPayloadsError(reqs)
+ return [[FailedPayloadsError(reqs)]]
+ return []
self.client.send_produce_request.side_effect = send_side_effect
@@ -154,7 +165,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
- raise FailedPayloadsError(reqs)
+ return [[FailedPayloadsError(reqs)]]
self.client.send_produce_request.side_effect = send_side_effect
@@ -168,30 +179,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
self.assertEqual(self.client.send_produce_request.call_count, 16)
- def test_with_unlimited_retries(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
-
- def send_side_effect(reqs, *args, **kwargs):
- raise FailedPayloadsError(reqs)
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(None)
-
- # the queue should have 7 elements
- # 3 batches of 1 msg each were retried all this time
- self.assertEqual(self.queue.empty(), False)
- try:
- for i in range(7):
- self.queue.get(timeout=0.01)
- except Empty:
- self.fail("Should be 7 elems in the queue")
- self.assertEqual(self.queue.empty(), True)
-
- # 1s / 50ms of backoff = 20 times max
- calls = self.client.send_produce_request.call_count
- self.assertTrue(calls > 10 & calls <= 20)
+ def tearDown(self):
+ for _ in xrange(self.queue.qsize()):
+ self.queue.get()