diff options
author | Mark Roberts <wizzat@gmail.com> | 2014-05-06 21:54:02 -0700 |
---|---|---|
committer | Mark Roberts <wizzat@gmail.com> | 2014-05-06 21:54:02 -0700 |
commit | b053da2203f2597801609824b510cfcc2e821d63 (patch) | |
tree | c0601359f26e5d9e88f5638a68241520297972f1 /test/test_producer_integration.py | |
parent | efcf58b84214aeda6cf79319f182407cde7833a6 (diff) | |
parent | 3b18043821f37242bde2b186684fa05d36c61921 (diff) | |
download | kafka-python-b053da2203f2597801609824b510cfcc2e821d63.tar.gz |
Merge branch 'master' into add_tests
kafka/client.py contained duplicate copies of same refactor, merged.
Move test/test_integration.py changes into test/test_producer_integration.
Conflicts:
kafka/client.py
servers/0.8.0/kafka-src
test/test_integration.py
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 9c9dbd3..c69e117 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -124,19 +124,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset1 = self.current_offset(self.topic, 1) producer = SimpleProducer(self.client) - # Will go to partition 0 - msg1, msg2, msg3, msg4, msg5 = [ str(uuid.uuid4()) for x in xrange(5) ] + # Goes to first partition, randomly. resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) self.assert_produce_response(resp, start_offset0) - # Will go to partition 1 + # Goes to the next partition, randomly. resp = producer.send_messages(self.topic, self.msg("three")) self.assert_produce_response(resp, start_offset1) self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two") ]) self.assert_fetch_offset(1, start_offset1, [ self.msg("three") ]) - # Will go to partition 0 + # Goes back to the first partition because there's only two partitions resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) self.assert_produce_response(resp, start_offset0+2) self.assert_fetch_offset(0, start_offset0, [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) @@ -144,9 +143,28 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): producer.stop() @kafka_versions("all") - def test_round_robin_partitioner(self): - msg1, msg2, msg3, msg4 = [ str(uuid.uuid4()) for _ in range(4) ] + def test_producer_random_order(self): + producer = SimpleProducer(self.client, random_start = True) + resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) + resp2 = producer.send_messages(self.topic, self.msg("three")) + resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) + + self.assertEqual(resp1[0].partition, resp3[0].partition) + self.assertNotEqual(resp1[0].partition, resp2[0].partition) + + @kafka_versions("all") + def test_producer_ordered_start(self): + producer = SimpleProducer(self.client, random_start = False) + resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) + resp2 = producer.send_messages(self.topic, self.msg("three")) + resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) + self.assertEqual(resp1[0].partition, 0) + self.assertEqual(resp2[0].partition, 1) + self.assertEqual(resp3[0].partition, 0) + + @kafka_versions("all") + def test_round_robin_partitioner(self): start_offset0 = self.current_offset(self.topic, 0) start_offset1 = self.current_offset(self.topic, 1) |