summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-28 20:46:50 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-28 20:46:50 -0400
commit478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb (patch)
tree44d2490febea4ab53a3f365e6eacc585b2a63ddd
parent51d8bbb231b29c006323d0705f8819f592aeb6e7 (diff)
downloadkafka-python-478de24d5b4a9c73c2fc969f3fd7cf8ff0710ecb.tar.gz
Adding some integration tests
-rw-r--r--example.py2
-rw-r--r--kafka.py5
-rw-r--r--test/integration.py45
3 files changed, 44 insertions, 8 deletions
diff --git a/example.py b/example.py
index 6ec223c..286bfdb 100644
--- a/example.py
+++ b/example.py
@@ -4,7 +4,7 @@ from kafka import KafkaClient, FetchRequest, ProduceRequest
def produce_example(kafka):
message = kafka.create_message("testing")
- request = ProduceRequest("my-topic", 0, [message])
+ request = ProduceRequest("my-topic", -1, [message])
kafka.send_message_set(request)
def consume_example(kafka):
diff --git a/kafka.py b/kafka.py
index 80abed3..ff9f53d 100644
--- a/kafka.py
+++ b/kafka.py
@@ -584,18 +584,17 @@ class KafkaClient(object):
# Simple User API #
#######################
- def send_messages_simple(self, topic, partition, *payloads):
+ def send_messages_simple(self, topic, *payloads):
"""
Send one or more strings to Kafka
Params
======
topic: string
- partition: int
payloads: strings
"""
messages = tuple([create_message(payload) for payload in payloads])
- self.send_message_set(ProduceRequest(topic, partition, messages))
+ self.send_message_set(ProduceRequest(topic, -1, messages))
def iter_messages(self, topic, partition, offset, size, auto=True):
"""
diff --git a/test/integration.py b/test/integration.py
index 0ce8b66..2779898 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -83,13 +83,50 @@ class IntegrationTest(unittest.TestCase):
self.kafka = KafkaClient("localhost", port)
def test_produce(self):
- req = ProduceRequest("my-topic", 0, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-0"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0"))
- req = ProduceRequest("my-topic", 1, [KafkaClient.create_message("testing")])
+ req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")])
self.kafka.send_message_set(req)
- self.assertTrue(self.server.wait_for("Created log for 'my-topic'-1"))
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1"))
+
+ def test_produce_consume(self):
+ message1 = KafkaClient.create_message("testing 1")
+ message2 = KafkaClient.create_message("testing 2")
+ req = ProduceRequest("test-produce-consume", 0, [message1, message2])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message1)
+ self.assertEquals(messages[1], message2)
+
+ message3 = KafkaClient.create_message("testing 3")
+ message4 = KafkaClient.create_message("testing 4")
+ req = ProduceRequest("test-produce-consume", 1, [message3, message4])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-produce-consume'-1"))
+ time.sleep(1)
+ req = FetchRequest("test-produce-consume", 1, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0], message3)
+ self.assertEquals(messages[1], message4)
+
+ def test_check_offset(self):
+ message1 = KafkaClient.create_message("testing 1")
+ req = ProduceRequest("test-check-offset", 0, [message1])
+ self.kafka.send_message_set(req)
+ self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0"))
+ time.sleep(1)
+ req = FetchRequest("test-check-offset", 0, 0, 1024)
+ (messages, req) = self.kafka.get_message_set(req)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], message1)
+ assertEquals(req.offset, len(KafkaClient.encode_message(message1)))
def tearDown(self):
self.kafka.close()