diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-07 18:52:05 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-07 19:09:32 -0700 |
commit | 715425c639a476139065689afde3d255a07d6f96 (patch) | |
tree | 0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /test/test_producer_integration.py | |
parent | a99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff) | |
parent | be23042ecd9ab330886745ccc9ec9e3a0039836f (diff) | |
download | kafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz |
Merge pull request #227 from wizzat-feature/py3
Python 3 Support
Conflicts:
kafka/producer.py
test/test_client.py
test/test_client_integration.py
test/test_codec.py
test/test_consumer.py
test/test_consumer_integration.py
test/test_failover_integration.py
test/test_producer.py
test/test_producer_integration.py
test/test_protocol.py
test/test_util.py
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 19d3a6d..125df2c 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -7,16 +7,16 @@ from kafka import ( create_message, create_gzip_message, create_snappy_message, RoundRobinPartitioner, HashedPartitioner ) +from kafka.codec import has_snappy from kafka.common import ( FetchRequest, ProduceRequest, UnknownTopicOrPartitionError ) -from kafka.codec import has_snappy from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - topic = 'produce_topic' + topic = b'produce_topic' @classmethod def setUpClass(cls): # noqa @@ -39,13 +39,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(100) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], start_offset, 100, ) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(100) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], start_offset+100, 100, ) @@ -55,7 +57,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(10000) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(10000)], start_offset, 10000, ) @@ -64,8 +67,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) - message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) - message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + message1 = create_gzip_message([ + ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)]) + message2 = create_gzip_message([ + ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)]) self.assert_produce_request( [ message1, message2 ], @@ -92,8 +97,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): msg_count = 1+100 messages = [ - create_message("Just a plain message"), - create_gzip_message(["Gzipped %d" % i for i in range(100)]), + create_message(b"Just a plain message"), + create_gzip_message([ + ("Gzipped %d" % i).encode('utf-8') for i in range(100)]), ] # All snappy integration tests fail with nosnappyjava @@ -108,14 +114,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ - create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + create_gzip_message([ + ("Gzipped batch 1, message %d" % i).encode('utf-8') + for i in range(50000)]) ], start_offset, 50000, ) self.assert_produce_request([ - create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + create_gzip_message([ + ("Gzipped batch 1, message %d" % i).encode('utf-8') + for i in range(50000)]) ], start_offset+50000, 50000, @@ -151,7 +161,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_produce__new_topic_fails_with_reasonable_error(self): - new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())) + new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') producer = SimpleProducer(self.client) # At first it doesn't exist |