summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-10-02 20:26:12 -0400
committerDavid Arthur <mumrah@gmail.com>2012-10-02 20:26:12 -0400
commitd9aafb14c24e92a114a5e5a6c2de284e22d6e01b (patch)
tree3736e1c4f9ab7d9c9303012640652120e2ffa2ad /test
parentcab6fee1e76fa133898e9c9570da6d432066d1c3 (diff)
downloadkafka-python-d9aafb14c24e92a114a5e5a6c2de284e22d6e01b.tar.gz
Isn't it nice when tests actually find bugs
Diffstat (limited to 'test')
-rw-r--r--test/integration.py50
1 files changed, 48 insertions, 2 deletions
diff --git a/test/integration.py b/test/integration.py
index 03e988e..dea3f2a 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -1,4 +1,5 @@
import glob
+import logging
import os
import select
import shlex
@@ -11,7 +12,7 @@ from threading import Thread, Event
import time
import unittest
-from kafka.client import KafkaClient, ProduceRequest, FetchRequest
+from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
def get_open_port():
sock = socket.socket()
@@ -70,7 +71,7 @@ class KafkaFixture(Thread):
killed = True
if proc.poll() is not None:
- #shutil.rmtree(self.tmpDir)
+ shutil.rmtree(self.tmpDir)
if killed:
break
else:
@@ -101,6 +102,11 @@ class IntegrationTest(unittest.TestCase):
cls.kafka.close()
cls.server.shouldDie.set()
+ def test_send_simple(self):
+ self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3")
+ self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple"))
+
def test_produce(self):
# Produce a message, check that the log got created
req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
@@ -164,6 +170,46 @@ class IntegrationTest(unittest.TestCase):
self.assertEquals(messages[0], message2)
self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2)))
+ def test_iterator(self):
+ # Produce 100 messages
+ messages = []
+ for i in range(100):
+ messages.append(KafkaClient.create_message("testing %d" % i))
+ req = ProduceRequest("test-iterator", 0, messages)
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'"))
+
+ # Initialize an iterator of fetch size 64 bytes - big enough for one message
+ # but not enough for all 100 messages
+ cnt = 0
+ for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)):
+ self.assertEquals(messages[i], msg)
+ cnt += 1
+ self.assertEquals(cnt, 100)
+
+ # Same thing, but don't auto paginate
+ cnt = 0
+ for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)):
+ self.assertEquals(messages[i], msg)
+ cnt += 1
+ self.assertTrue(cnt < 100)
+
+ def test_offset_request(self):
+ # Produce a message to create the topic/partition
+ message1 = KafkaClient.create_message("testing 1")
+ req = ProduceRequest("test-offset-request", 0, [message1])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0"))
+ self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'"))
+
+ t1 = int(time.time()*1000) # now
+ t2 = t1 + 60000 # one minute from now
+ req = OffsetRequest("test-offset-request", 0, t1, 1024)
+ print self.kafka.get_offsets(req)
+
+ req = OffsetRequest("test-offset-request", 0, t2, 1024)
+ print self.kafka.get_offsets(req)
if __name__ == "__main__":
unittest.main()