summaryrefslogtreecommitdiff
path: root/test/integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/integration.py')
-rw-r--r--test/integration.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/test/integration.py b/test/integration.py
index 0ce8b66..2779898 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase):
self.kafka = KafkaClient("localhost", port)
def test_produce(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
- req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
+
+ def test_produce_consume(self):
+ message1 = KafkaClient.create_message("testing 1")
+ message2 = KafkaClient.create_message("testing 2")
+ req = ProduceRequest("test-produce-consume", 0, [message1, message2])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message1)
+ self.assertEquals(messages[1], message2)
+
+ message3 = KafkaClient.create_message("testing 3")
+ message4 = KafkaClient.create_message("testing 4")
+ req = ProduceRequest("test-produce-consume", 1, [message3, message4])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 1, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message3)
+ self.assertEquals(messages[1], message4)
+
+ def test_check_offset(self):
+ message1 = KafkaClient.create_message("testing 1")
+ req = ProduceRequest("test-check-offset", 0, [message1])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-check-offset", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], message1)
+ assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
def tearDown(self):
self.kafka.close()