summaryrefslogtreecommitdiff
path: root/test/test_producer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /test/test_producer.py
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'test/test_producer.py')
-rw-r--r--test/test_producer.py43
1 files changed, 18 insertions, 25 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index cc65a0a..850cb80 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -2,28 +2,21 @@
import collections
import logging
+import threading
import time
from mock import MagicMock, patch
from . import unittest
-from kafka import KafkaClient, SimpleProducer, KeyedProducer
+from kafka import SimpleClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
- ProduceResponse, RetryOptions, TopicAndPartition
+ ProduceResponsePayload, RetryOptions, TopicPartition
)
from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
-import threading
-try:
- from queue import Empty, Queue
-except ImportError:
- from Queue import Empty, Queue
-try:
- xrange
-except NameError:
- xrange = range
+from six.moves import queue, xrange
class TestKafkaProducer(unittest.TestCase):
@@ -96,12 +89,12 @@ class TestKafkaProducer(unittest.TestCase):
def test_producer_sync_fail_on_error(self):
error = FailedPayloadsError('failure')
- with patch.object(KafkaClient, 'load_metadata_for_topics'):
- with patch.object(KafkaClient, 'ensure_topic_exists'):
- with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
+ with patch.object(SimpleClient, 'load_metadata_for_topics'):
+ with patch.object(SimpleClient, 'ensure_topic_exists'):
+ with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
- client = KafkaClient(MagicMock())
+ client = SimpleClient(MagicMock())
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
# This should not raise
@@ -131,7 +124,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def setUp(self):
self.client = MagicMock()
- self.queue = Queue()
+ self.queue = queue.Queue()
def _run_process(self, retries_limit=3, sleep_timeout=1):
# run _send_upstream process with the queue
@@ -157,7 +150,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 1 partition
for i in range(10):
- self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
self._run_process()
@@ -173,7 +166,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
@@ -187,7 +180,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
offset = offsets[req.topic][req.partition]
offsets[req.topic][req.partition] += len(req.messages)
responses.append(
- ProduceResponse(req.topic, req.partition, 0, offset)
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
)
return responses
@@ -207,7 +200,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
+ self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
return [FailedPayloadsError(req) for req in reqs]
@@ -227,7 +220,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def test_async_producer_not_leader(self):
for i in range(10):
- self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
@@ -235,8 +228,8 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
- return [ProduceResponse(req.topic, req.partition,
- NotLeaderForPartitionError.errno, -1)
+ return [ProduceResponsePayload(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
for req in reqs]
responses = []
@@ -244,7 +237,7 @@ class TestKafkaProducerSendUpstream(unittest.TestCase):
offset = offsets[req.topic][req.partition]
offsets[req.topic][req.partition] += len(req.messages)
responses.append(
- ProduceResponse(req.topic, req.partition, 0, offset)
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
)
return responses