summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-05-06 21:54:02 -0700
committerMark Roberts <wizzat@gmail.com>2014-05-06 21:54:02 -0700
commitb053da2203f2597801609824b510cfcc2e821d63 (patch)
treec0601359f26e5d9e88f5638a68241520297972f1 /test/test_producer_integration.py
parentefcf58b84214aeda6cf79319f182407cde7833a6 (diff)
parent3b18043821f37242bde2b186684fa05d36c61921 (diff)
downloadkafka-python-b053da2203f2597801609824b510cfcc2e821d63.tar.gz
Merge branch 'master' into add_tests
kafka/client.py contained duplicate copies of same refactor, merged. Move test/test_integration.py changes into test/test_producer_integration. Conflicts: kafka/client.py servers/0.8.0/kafka-src test/test_integration.py
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py30
1 files changed, 24 insertions, 6 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 9c9dbd3..c69e117 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -124,19 +124,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset1 = self.current_offset(self.topic, 1)
producer = SimpleProducer(self.client)
- # Will go to partition 0
- msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ]
+ # Goes to first partition, randomly.
resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
self.assert_produce_response(resp, start_offset0)
- # Will go to partition 1
+ # Goes to the next partition, randomly.
resp = producer.send_messages(self.topic, self.msg("three"))
self.assert_produce_response(resp, start_offset1)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ])
self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ])
- # Will go to partition 0
+ # Goes back to the first partition because there's only two partitions
resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
self.assert_produce_response(resp, start_offset0+2)
self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
@@ -144,9 +143,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
producer.stop()
@kafka_versions("all")
- def test_round_robin_partitioner(self):
- msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ]
+ def test_producer_random_order(self):
+ producer = SimpleProducer(self.client, random_start = True)
+ resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ resp2 = producer.send_messages(self.topic, self.msg("three"))
+ resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+
+ self.assertEqual(resp1[0].partition, resp3[0].partition)
+ self.assertNotEqual(resp1[0].partition, resp2[0].partition)
+
+ @kafka_versions("all")
+ def test_producer_ordered_start(self):
+ producer = SimpleProducer(self.client, random_start = False)
+ resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
+ resp2 = producer.send_messages(self.topic, self.msg("three"))
+ resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
+ self.assertEqual(resp1[0].partition, 0)
+ self.assertEqual(resp2[0].partition, 1)
+ self.assertEqual(resp3[0].partition, 0)
+
+ @kafka_versions("all")
+ def test_round_robin_partitioner(self):
start_offset0 = self.current_offset(self.topic, 0)
start_offset1 = self.current_offset(self.topic, 1)