summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-07 18:52:05 -0700
committerDana Powers <dana.powers@rd.io>2014-09-07 19:09:32 -0700
commit715425c639a476139065689afde3d255a07d6f96 (patch)
tree0ef2cd875c97c8ca867d89328d6fd5fec7dfcbe8 /test/test_producer_integration.py
parenta99384f4c601d127ab1c4fe5b272ea5c07fd695d (diff)
parentbe23042ecd9ab330886745ccc9ec9e3a0039836f (diff)
downloadkafka-python-715425c639a476139065689afde3d255a07d6f96.tar.gz
Merge pull request #227 from wizzat-feature/py3
Python 3 Support Conflicts: kafka/producer.py test/test_client.py test/test_client_integration.py test/test_codec.py test/test_consumer.py test/test_consumer_integration.py test/test_failover_integration.py test/test_producer.py test/test_producer_integration.py test/test_protocol.py test/test_util.py
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py34
1 files changed, 22 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 19d3a6d..125df2c 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -7,16 +7,16 @@ from kafka import (
create_message, create_gzip_message, create_snappy_message,
RoundRobinPartitioner, HashedPartitioner
)
+from kafka.codec import has_snappy
from kafka.common import (
FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
)
-from kafka.codec import has_snappy
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
- topic = 'produce_topic'
+ topic = b'produce_topic'
@classmethod
def setUpClass(cls): # noqa
@@ -39,13 +39,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset,
100,
)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset+100,
100,
)
@@ -55,7 +57,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(10000) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(10000)],
start_offset,
10000,
)
@@ -64,8 +67,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
- message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
- message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+ message1 = create_gzip_message([
+ ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
+ message2 = create_gzip_message([
+ ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
self.assert_produce_request(
[ message1, message2 ],
@@ -92,8 +97,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count = 1+100
messages = [
- create_message("Just a plain message"),
- create_gzip_message(["Gzipped %d" % i for i in range(100)]),
+ create_message(b"Just a plain message"),
+ create_gzip_message([
+ ("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
]
# All snappy integration tests fail with nosnappyjava
@@ -108,14 +114,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset,
50000,
)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset+50000,
50000,
@@ -151,7 +161,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
- new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
+ new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client)
# At first it doesn't exist