summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py36
1 files changed, 21 insertions, 15 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 4331d23..19d28bd 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -142,7 +142,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
- producer = SimpleProducer(self.client)
+ producer = SimpleProducer(self.client, random_start=False)
# Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -165,7 +165,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
- producer = SimpleProducer(self.client)
+ producer = SimpleProducer(self.client, random_start=False)
# At first it doesn't exist
with self.assertRaises((UnknownTopicOrPartitionError,
@@ -174,7 +174,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_producer_random_order(self):
- producer = SimpleProducer(self.client, random_start = True)
+ producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
resp2 = producer.send_messages(self.topic, self.msg("three"))
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
@@ -184,7 +184,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_producer_ordered_start(self):
- producer = SimpleProducer(self.client, random_start = False)
+ producer = SimpleProducer(self.client, random_start=False)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
resp2 = producer.send_messages(self.topic, self.msg("three"))
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
@@ -249,7 +249,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
@@ -260,7 +261,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
@@ -274,7 +276,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(
self.client,
- req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
@@ -287,10 +290,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
- producer = SimpleProducer(self.client,
- batch_send=True,
- batch_send_every_n=5,
- batch_send_every_t=20)
+ producer = SimpleProducer(
+ self.client,
+ batch_send=True,
+ batch_send_every_n=5,
+ batch_send_every_t=20,
+ random_start=False)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
@@ -337,9 +342,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client,
- batch_send=True,
- batch_send_every_n=100,
- batch_send_every_t=5)
+ batch_send=True,
+ batch_send_every_n=100,
+ batch_send_every_t=5,
+ random_start=False)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
@@ -387,7 +393,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, async=True)
+ producer = SimpleProducer(self.client, async=True, random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)