summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-01-12 23:31:38 -0800
committerDana Powers <dana.powers@gmail.com>2015-01-12 23:31:38 -0800
commit8178b9d61ca5f8ea3c08d7336d4c645b0156f560 (patch)
tree641b05e0b8915d5c46f50f48c6dc716058773191
parent02c2b469003e2ddcb051dbb4d95977137050c19f (diff)
parentd6f1a0a6e4546145501ada46dcfa72de1c575111 (diff)
downloadkafka-python-8178b9d61ca5f8ea3c08d7336d4c645b0156f560.tar.gz
Merge pull request #288 from alexcb/master
Randomize start by default for SimpleProducer
-rw-r--r--kafka/producer/simple.py2
-rw-r--r--test/test_producer_integration.py36
2 files changed, 22 insertions, 16 deletions
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index a10fa8c..401b79b 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -42,7 +42,7 @@ class SimpleProducer(Producer):
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=False):
+ random_start=True):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 4331d23..19d28bd 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -142,7 +142,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
- producer = SimpleProducer(self.client)
+ producer = SimpleProducer(self.client, random_start=False)
# Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
@@ -165,7 +165,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
- producer = SimpleProducer(self.client)
+ producer = SimpleProducer(self.client, random_start=False)
# At first it doesn't exist
with self.assertRaises((UnknownTopicOrPartitionError,
@@ -174,7 +174,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_producer_random_order(self):
- producer = SimpleProducer(self.client, random_start = True)
+ producer = SimpleProducer(self.client, random_start=True)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
resp2 = producer.send_messages(self.topic, self.msg("three"))
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
@@ -184,7 +184,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_producer_ordered_start(self):
- producer = SimpleProducer(self.client, random_start = False)
+ producer = SimpleProducer(self.client, random_start=False)
resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
resp2 = producer.send_messages(self.topic, self.msg("three"))
resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
@@ -249,7 +249,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_acks_none(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_NOT_REQUIRED,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
@@ -260,7 +261,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_acks_local_write(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ producer = SimpleProducer(self.client, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
@@ -274,7 +276,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer = SimpleProducer(
self.client,
- req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT,
+ random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assert_produce_response(resp, start_offset0)
@@ -287,10 +290,12 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)
- producer = SimpleProducer(self.client,
- batch_send=True,
- batch_send_every_n=5,
- batch_send_every_t=20)
+ producer = SimpleProducer(
+ self.client,
+ batch_send=True,
+ batch_send_every_n=5,
+ batch_send_every_t=20,
+ random_start=False)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
@@ -337,9 +342,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client,
- batch_send=True,
- batch_send_every_n=100,
- batch_send_every_t=5)
+ batch_send=True,
+ batch_send_every_n=100,
+ batch_send_every_t=5,
+ random_start=False)
# Send 5 messages and do a fetch
resp = producer.send_messages(self.topic,
@@ -387,7 +393,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_async_simple_producer(self):
start_offset0 = self.current_offset(self.topic, 0)
- producer = SimpleProducer(self.client, async=True)
+ producer = SimpleProducer(self.client, async=True, random_start=False)
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)