summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorEduard Iskandarov <edikexp@gmail.com>2015-01-24 00:30:50 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:48 +0300
commitf41e5f3e4befda52a20f072f85b807d77361e64d (patch)
treed78c1b80f6b31802844136cc503ef19019ea25c3 /test/test_producer.py
parentcf363089617de2d0b18cb83eba1e61adbc5d0144 (diff)
downloadkafka-python-f41e5f3e4befda52a20f072f85b807d77361e64d.tar.gz
async queue: refactored code; add one more test
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py18
1 files changed, 16 insertions, 2 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index b57dfd8..627178d 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -53,15 +53,29 @@ class TestKafkaProducer(unittest.TestCase):
assert client.send_produce_request.called
@patch('kafka.producer.base.Process')
- def test_producer_batch_send_queue_overfilled(self, process_mock):
+ def test_producer_async_queue_overfilled_batch_send(self, process_mock):
queue_size = 2
producer = Producer(MagicMock(), batch_send=True,
- batch_send_queue_maxsize=queue_size)
+ async_queue_maxsize=queue_size)
topic = b'test-topic'
partition = 0
+ message = b'test-message'
+
+ with self.assertRaises(BatchQueueOverfilledError):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+ @patch('kafka.producer.base.Process')
+ def test_producer_async_queue_overfilled(self, process_mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), async=True,
+ async_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
message = b'test-message'
+
with self.assertRaises(BatchQueueOverfilledError):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)