summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-28 20:46:50 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-28 20:46:50 -0400
commit478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb (patch)
tree44d2490febea4ab53a3f365e6eacc585b2a63ddd /test
parent51d8bbb231b29c006323d0705f8819f592aeb6e7 (diff)
downloadkafka-python-478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb.tar.gz
Adding some integration tests
Diffstat (limited to 'test')
-rw-r--r--test/integration.py45
1 files changed, 41 insertions, 4 deletions
diff --git a/test/integration.py b/test/integration.py
index 0ce8b66..2779898 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase):
self.kafka = KafkaClient("localhost", port)
def test_produce(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
- req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
+
+ def test_produce_consume(self):
+ message1 = KafkaClient.create_message("testing 1")
+ message2 = KafkaClient.create_message("testing 2")
+ req = ProduceRequest("test-produce-consume", 0, [message1, message2])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message1)
+ self.assertEquals(messages[1], message2)
+
+ message3 = KafkaClient.create_message("testing 3")
+ message4 = KafkaClient.create_message("testing 4")
+ req = ProduceRequest("test-produce-consume", 1, [message3, message4])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 1, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message3)
+ self.assertEquals(messages[1], message4)
+
+ def test_check_offset(self):
+ message1 = KafkaClient.create_message("testing 1")
+ req = ProduceRequest("test-check-offset", 0, [message1])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-check-offset", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], message1)
+ assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
def tearDown(self):
self.kafka.close()