diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-10 00:48:19 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-10 01:35:01 -0700 |
commit | 61172651082365a8d3dc244d531d0c02e888a138 (patch) | |
tree | 7d512388ffb5335b8e32071834d39f0f019de57c | |
parent | 4c9a3c6b9dac952154cdab2e11892bff240f9c91 (diff) | |
download | kafka-python-61172651082365a8d3dc244d531d0c02e888a138.tar.gz |
Add KafkaClient test to show that request / response ordering is broken
-rw-r--r-- | test/test_client_integration.py | 32 |
1 files changed, 31 insertions, 1 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 585123b..a6ea8f7 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -2,8 +2,9 @@ import os from kafka.common import ( FetchRequest, OffsetCommitRequest, OffsetFetchRequest, - KafkaTimeoutError + KafkaTimeoutError, ProduceRequest ) +from kafka.protocol import create_message from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions @@ -49,6 +50,35 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase): with self.assertRaises(KafkaTimeoutError): self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0) + @kafka_versions('all') + def test_send_produce_request_maintains_request_response_order(self): + + self.client.ensure_topic_exists(b'foo', timeout=1) + self.client.ensure_topic_exists(b'bar', timeout=1) + + requests = [ + ProduceRequest( + b'foo', 0, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'foo', 1, + [create_message(b'a'), create_message(b'b')]), + ProduceRequest( + b'bar', 0, + [create_message(b'a'), create_message(b'b')]), + ] + + responses = self.client.send_produce_request(requests) + while len(responses): + request = requests.pop() + response = responses.pop() + self.assertEqual(request.topic, response.topic) + self.assertEqual(request.partition, response.partition) + + #################### # Offset Tests # #################### |