diff options
Diffstat (limited to 'test/integration.py')
-rw-r--r-- | test/integration.py | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/test/integration.py b/test/integration.py index 0ce8b66..2779898 100644 --- a/test/integration.py +++ b/test/integration.py @@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase): self.kafka = KafkaClient("localhost", port) def test_produce(self): - req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")]) + req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0")) + self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) - req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")]) + req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1")) + self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) + + def test_produce_consume(self): + message1 = KafkaClient.create_message("testing 1") + message2 = KafkaClient.create_message("testing 2") + req = ProduceRequest("test-produce-consume", 0, [message1, message2]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0")) + time.sleep(1) + req = FetchRequest("test-produce-consume", 0, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0], message1) + self.assertEquals(messages[1], message2) + + message3 = KafkaClient.create_message("testing 3") + message4 = KafkaClient.create_message("testing 4") + req = ProduceRequest("test-produce-consume", 1, [message3, message4]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1")) + time.sleep(1) + req = FetchRequest("test-produce-consume", 1, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0], message3) + self.assertEquals(messages[1], message4) + + def test_check_offset(self): + message1 = KafkaClient.create_message("testing 1") + req = ProduceRequest("test-check-offset", 0, [message1]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) + time.sleep(1) + req = FetchRequest("test-check-offset", 0, 0, 1024) + (messages, req) = self.kafka.get_message_set(req) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0], message1) + assertEquals(req.offset, len(KafkaClient.encode_message(message1))) def tearDown(self): self.kafka.close() |