summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-24 22:21:13 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-06-03 11:22:47 +0300
commit5e8dc6dcf55890a4e3a214a943ecc655faed3ecc (patch)
treef5ec10c67ec6c0e48971e2f973f4768807693ca7 /test/test_producer.py
parentb0a04595c6aee7f6fcaa8927fcdfcd9a04a9b7d3 (diff)
downloadkafka-python-5e8dc6dcf55890a4e3a214a943ecc655faed3ecc.tar.gz
Fixed tests and other issues after rebase
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py49
1 files changed, 19 insertions, 30 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 51a74b5..cc58fe4 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -12,11 +12,10 @@ from kafka.producer.base import _send_upstream
from kafka.protocol import CODEC_NONE
import threading
-import multiprocessing as mp
try:
- from queue import Empty
+ from queue import Empty, Queue
except ImportError:
- from Queue import Empty
+ from Queue import Empty, Queue
class TestKafkaProducer(unittest.TestCase):
@@ -56,21 +55,13 @@ class TestKafkaProducer(unittest.TestCase):
class TestKafkaProducerSendUpstream(unittest.TestCase):
def setUp(self):
-
- # create a multiprocessing Value to store call counter
- # (magicmock counters don't work with other processes)
- self.send_calls_count = mp.Value('i', 0)
-
- def send_side_effect(*args, **kwargs):
- self.send_calls_count.value += 1
-
self.client = MagicMock()
- self.client.send_produce_request.side_effect = send_side_effect
- self.queue = mp.Queue()
+ self.queue = Queue()
def _run_process(self, retries_limit=3, sleep_timeout=1):
# run _send_upstream process with the queue
- self.process = mp.Process(
+ stop_event = threading.Event()
+ self.thread = threading.Thread(
target=_send_upstream,
args=(self.queue, self.client, CODEC_NONE,
0.3, # batch time (seconds)
@@ -78,11 +69,12 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
Producer.ACK_AFTER_LOCAL_WRITE,
Producer.DEFAULT_ACK_TIMEOUT,
50, # retry backoff (ms)
- retries_limit))
- self.process.daemon = True
- self.process.start()
+ retries_limit,
+ stop_event))
+ self.thread.daemon = True
+ self.thread.start()
time.sleep(sleep_timeout)
- self.process.terminate()
+ stop_event.set()
def test_wo_retries(self):
@@ -97,7 +89,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# there should be 4 non-void cals:
# 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.send_calls_count.value, 4)
+ self.assertEqual(self.client.send_produce_request.call_count, 4)
+
def test_first_send_failed(self):
@@ -106,11 +99,10 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
- is_first_time = mp.Value('b', True)
+ self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
- self.send_calls_count.value += 1
- if is_first_time.value:
- is_first_time.value = False
+ if self.client.is_first_time:
+ self.client.is_first_time = False
raise FailedPayloadsError(reqs)
self.client.send_produce_request.side_effect = send_side_effect
@@ -122,7 +114,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# there should be 5 non-void cals: 1st failed batch of 3 msgs
# + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
- self.assertEqual(self.send_calls_count.value, 5)
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
def test_with_limited_retries(self):
@@ -132,7 +124,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
def send_side_effect(reqs, *args, **kwargs):
- self.send_calls_count.value += 1
raise FailedPayloadsError(reqs)
self.client.send_produce_request.side_effect = send_side_effect
@@ -145,8 +136,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# there should be 16 non-void cals:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
# 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
- self.assertEqual(self.send_calls_count.value, 16)
-
+ self.assertEqual(self.client.send_produce_request.call_count, 16)
def test_with_unlimited_retries(self):
@@ -156,7 +146,6 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
def send_side_effect(reqs, *args, **kwargs):
- self.send_calls_count.value += 1
raise FailedPayloadsError(reqs)
self.client.send_produce_request.side_effect = send_side_effect
@@ -174,5 +163,5 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
self.assertEqual(self.queue.empty(), True)
# 1s / 50ms of backoff = 20 times max
- self.assertTrue(self.send_calls_count.value > 10)
- self.assertTrue(self.send_calls_count.value <= 20)
+ calls = self.client.send_produce_request.call_count
+ self.assertTrue(calls > 10 & calls <= 20)