summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-10 00:48:19 -0700
committerDana Powers <dana.powers@rd.io>2015-06-10 01:35:01 -0700
commit61172651082365a8d3dc244d531d0c02e888a138 (patch)
tree7d512388ffb5335b8e32071834d39f0f019de57c
parent4c9a3c6b9dac952154cdab2e11892bff240f9c91 (diff)
downloadkafka-python-61172651082365a8d3dc244d531d0c02e888a138.tar.gz
Add KafkaClient test to show that request / response ordering is broken
-rw-r--r--test/test_client_integration.py32
1 files changed, 31 insertions, 1 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 585123b..a6ea8f7 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,8 +2,9 @@ import os
from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
- KafkaTimeoutError
+ KafkaTimeoutError, ProduceRequest
)
+from kafka.protocol import create_message
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
@@ -49,6 +50,35 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+ @kafka_versions('all')
+ def test_send_produce_request_maintains_request_response_order(self):
+
+ self.client.ensure_topic_exists(b'foo', timeout=1)
+ self.client.ensure_topic_exists(b'bar', timeout=1)
+
+ requests = [
+ ProduceRequest(
+ b'foo', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'foo', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ]
+
+ responses = self.client.send_produce_request(requests)
+ while len(responses):
+ request = requests.pop()
+ response = responses.pop()
+ self.assertEqual(request.topic, response.topic)
+ self.assertEqual(request.partition, response.partition)
+
+
####################
# Offset Tests #
####################