diff options
author | David Arthur <mumrah@gmail.com> | 2012-10-02 20:26:12 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-10-02 20:26:12 -0400 |
commit | d9aafb14c24e92a114a5e5a6c2de284e22d6e01b (patch) | |
tree | 3736e1c4f9ab7d9c9303012640652120e2ffa2ad /test | |
parent | cab6fee1e76fa133898e9c9570da6d432066d1c3 (diff) | |
download | kafka-python-d9aafb14c24e92a114a5e5a6c2de284e22d6e01b.tar.gz |
Isn't it nice when tests actually find bugs
Diffstat (limited to 'test')
-rw-r--r-- | test/integration.py | 50 |
1 files changed, 48 insertions, 2 deletions
diff --git a/test/integration.py b/test/integration.py index 03e988e..dea3f2a 100644 --- a/test/integration.py +++ b/test/integration.py @@ -1,4 +1,5 @@ import glob +import logging import os import select import shlex @@ -11,7 +12,7 @@ from threading import Thread, Event import time import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest +from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest def get_open_port(): sock = socket.socket() @@ -70,7 +71,7 @@ class KafkaFixture(Thread): killed = True if proc.poll() is not None: - #shutil.rmtree(self.tmpDir) + shutil.rmtree(self.tmpDir) if killed: break else: @@ -101,6 +102,11 @@ class IntegrationTest(unittest.TestCase): cls.kafka.close() cls.server.shouldDie.set() + def test_send_simple(self): + self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3") + self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'")) + self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple")) + def test_produce(self): # Produce a message, check that the log got created req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) @@ -164,6 +170,46 @@ class IntegrationTest(unittest.TestCase): self.assertEquals(messages[0], message2) self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2))) + def test_iterator(self): + # Produce 100 messages + messages = [] + for i in range(100): + messages.append(KafkaClient.create_message("testing %d" % i)) + req = ProduceRequest("test-iterator", 0, messages) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0")) + self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'")) + + # Initialize an iterator of fetch size 64 bytes - big enough for one message + # but not enough for all 100 messages + cnt = 0 + for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)): + self.assertEquals(messages[i], msg) + cnt += 1 + self.assertEquals(cnt, 100) + + # Same thing, but don't auto paginate + cnt = 0 + for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)): + self.assertEquals(messages[i], msg) + cnt += 1 + self.assertTrue(cnt < 100) + + def test_offset_request(self): + # Produce a message to create the topic/partition + message1 = KafkaClient.create_message("testing 1") + req = ProduceRequest("test-offset-request", 0, [message1]) + self.kafka.send_message_set(req) + self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0")) + self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'")) + + t1 = int(time.time()*1000) # now + t2 = t1 + 60000 # one minute from now + req = OffsetRequest("test-offset-request", 0, t1, 1024) + print self.kafka.get_offsets(req) + + req = OffsetRequest("test-offset-request", 0, t2, 1024) + print self.kafka.get_offsets(req) if __name__ == "__main__": unittest.main() |