From 2a3d231aa61642c57537bc2128dd4f2bd30f35dd Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 22 Feb 2013 23:09:25 -0500 Subject: Protocol and low-level client done, adding tests --- test/integration.py | 407 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 245 insertions(+), 162 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 3971d3f..598b17a 100644 --- a/test/integration.py +++ b/test/integration.py @@ -11,8 +11,7 @@ from threading import Thread, Event import time import unittest -from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest -from kafka.queue import KafkaQueue +from kafka.client08 import * def get_open_port(): sock = socket.socket() @@ -27,12 +26,15 @@ def build_kafka_classpath(): jars += glob.glob(os.path.join(baseDir, "project/boot/scala-2.8.0/lib/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/target/scala_2.8.0/*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib/*.jar")) - jars += glob.glob(os.path.join(baseDir, "perf/target/scala_2.8.0/kafka*.jar")) jars += glob.glob(os.path.join(baseDir, "core/lib_managed/scala_2.8.0/compile/*.jar")) - return ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + jars += glob.glob(os.path.join(baseDir, "core/target/scala-2.8.0/kafka_2.8.0-*.jar")) + jars += glob.glob(os.path.join(baseDir, "/Users/mumrah/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar")) + cp = ":".join(["."] + [os.path.abspath(jar) for jar in jars]) + cp += ":" + os.path.abspath(os.path.join(baseDir, "conf/log4j.properties")) + return cp class KafkaFixture(Thread): - def __init__(self, port): + def __init__(self, host, port): Thread.__init__(self) self.port = port self.capture = "" @@ -57,7 +59,7 @@ class KafkaFixture(Thread): # Start Kafka args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile)) - proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) + proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) killed = False while True: @@ -65,6 +67,7 @@ class KafkaFixture(Thread): if proc.stdout in rlist: read = proc.stdout.readline() stdout.write(read) + stdout.flush() self.capture += read if self.shouldDie.is_set(): @@ -88,174 +91,254 @@ class KafkaFixture(Thread): return True time.sleep(0.100) + def close(self): + self.shouldDie.set() -class IntegrationTest(unittest.TestCase): +class ExternalKafkaFixture(object): + def __init__(self, host, port): + print("Using already running Kafka at %s:%d" % (host, port)) + + def close(self): + pass + + +class TestKafkaClient(unittest.TestCase): @classmethod def setUpClass(cls): - port = get_open_port() - cls.server = KafkaFixture(port) - cls.server.start() - cls.server.wait_for("Kafka server started") - cls.kafka = KafkaClient("localhost", port) + if os.environ.has_key('KAFKA_URI'): + parse = urlparse(os.environ['KAFKA_URI']) + (host, port) = (parse.hostname, parse.port) + cls.server = ExternalKafkaFixture(host, port) + cls.client = KafkaClient(host, port) + else: + port = get_open_port() + cls.server = KafkaFixture("localhost", port) + cls.server.start() + cls.server.wait_for("Kafka server started") + cls.client = KafkaClient("localhost", port) @classmethod def tearDownClass(cls): - cls.kafka.close() - cls.server.shouldDie.set() - - def test_send_simple(self): - self.kafka.send_messages_simple("test-send-simple", "test 1", "test 2", "test 3") - self.assertTrue(self.server.wait_for("Created log for 'test-send-simple'")) - self.assertTrue(self.server.wait_for("Flushing log 'test-send-simple")) - - def test_produce(self): - # Produce a message, check that the log got created - req = ProduceRequest("test-produce", 0, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-0")) - - # Same thing, different partition - req = ProduceRequest("test-produce", 1, [KafkaClient.create_message("testing")]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-produce'-1")) - - def _test_produce_consume(self, topic, create_func): - # Send two messages and consume them - message1 = create_func("testing 1") - message2 = create_func("testing 2") - req = ProduceRequest(topic, 0, [message1, message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-0" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-0'" % topic)) - req = FetchRequest(topic, 0, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 1") - self.assertEquals(messages[1].payload, "testing 2") - - # Do the same, but for a different partition - message3 = create_func("testing 3") - message4 = create_func("testing 4") - req = ProduceRequest(topic, 1, [message3, message4]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for '%s'-1" % topic)) - self.assertTrue(self.server.wait_for("Flushing log '%s-1'" % topic)) - req = FetchRequest(topic, 1, 0, 1024) - (messages, req) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 2) - self.assertEquals(messages[0].payload, "testing 3") - self.assertEquals(messages[1].payload, "testing 4") + cls.client.close() + cls.server.close() + + ##################### + # Produce Tests # + ##################### + + def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 200) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 300) + + def test_produce_10k_simple(self): + produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_10k_simple", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 10000) + + def test_produce_many_gzip(self): + message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_gzip", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_many_snappy(self): + message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_many_snappy", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 200) + + def test_produce_mixed(self): + message1 = KafkaProtocol.create_message("Just a plain message") + message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) + + produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_mixed", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 201) + + + def test_produce_100k_gzipped(self): + produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 100000) + + ##################### + # Consume Tests # + ##################### + + def test_consume_none(self): + fetch = FetchRequest("test_consume_none", 0, 0, 1024) + + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + self.assertEquals(fetch_resp.topic, "test_consume_none") + self.assertEquals(fetch_resp.partition, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 0) def test_produce_consume(self): - self._test_produce_consume("test-produce-consume", KafkaClient.create_message) - - def test_produce_consume_snappy(self): - self._test_produce_consume("test-produce-consume-snappy", KafkaClient.create_snappy_message) - - def test_produce_consume_gzip(self): - self._test_produce_consume("test-produce-consume-gzip", KafkaClient.create_gzip_message) - - def test_check_offset(self): - # Produce/consume a message, check that the next offset looks correct - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-check-offset", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-check-offset'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - req = FetchRequest("test-check-offset", 0, 0, 1024) - (messages, nextReq) = self.kafka.get_message_set(req) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message1) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1))) - - # Produce another message, consume with the last offset - message2 = KafkaClient.create_message("test 2") - req = ProduceRequest("test-check-offset", 0, [message2]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Flushing log 'test-check-offset-0'")) - - # Verify - (messages, nextReq) = self.kafka.get_message_set(nextReq) - self.assertEquals(len(messages), 1) - self.assertEquals(messages[0], message2) - self.assertEquals(nextReq.offset, len(KafkaClient.encode_message(message1)) + len(KafkaClient.encode_message(message2))) - - def test_iterator(self): - # Produce 100 messages - messages = [] - for i in range(100): - messages.append(KafkaClient.create_message("testing %d" % i)) - req = ProduceRequest("test-iterator", 0, messages) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-iterator'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-iterator-0'")) - - # Initialize an iterator of fetch size 64 bytes - big enough for one message - # but not enough for all 100 messages - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertEquals(cnt, 100) - - # Same thing, but don't auto paginate - cnt = 0 - for i, msg in enumerate(self.kafka.iter_messages("test-iterator", 0, 0, 64, False)): - self.assertEquals(messages[i], msg) - cnt += 1 - self.assertTrue(cnt < 100) - - def test_offset_request(self): - # Produce a message to create the topic/partition - message1 = KafkaClient.create_message("testing 1") - req = ProduceRequest("test-offset-request", 0, [message1]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-offset-request'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-offset-request-0'")) - - t1 = int(time.time()*1000) # now - t2 = t1 + 60000 # one minute from now - req = OffsetRequest("test-offset-request", 0, t1, 1024) - self.kafka.get_offsets(req) - - req = OffsetRequest("test-offset-request", 0, t2, 1024) - self.kafka.get_offsets(req) - - def test_10k_messages(self): - msg_tmpl = "this is a test message with a few bytes in it. this is message number %d" - # TODO 10k actually fails, why? - msg = KafkaClient.create_gzip_message(*[msg_tmpl % i for i in range(1000)]) - req = ProduceRequest("test-10k", 0, [msg]) - self.kafka.send_message_set(req) - self.assertTrue(self.server.wait_for("Created log for 'test-10k'-0")) - self.assertTrue(self.server.wait_for("Flushing log 'test-10k-0'")) - #self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1")) - #self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'")) - - def test_queue(self): - # Send 1000 messages - q = KafkaQueue(self.kafka, "test-queue", [0,1]) - t1 = time.time() - for i in range(1000): - q.put("test %d" % i) - t2 = time.time() + produce = ProduceRequest("test_produce_consume", 0, messages=[ + KafkaProtocol.create_message("Just a test message"), + KafkaProtocol.create_message("Message with a key", "foo"), + ]) - # Wait for the producer to fully flush - time.sleep(2) + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) - # Copy all the messages into a list - t1 = time.time() - consumed = [] - for i in range(1000): - consumed.append(q.get()) - t2 = time.time() + fetch = FetchRequest("test_produce_consume", 0, 0, 1024) - # Verify everything is there - for i in range(1000): - self.assertTrue("test %d" % i in consumed) + fetch_resp = self.client.send_fetch_request([fetch]).next() + self.assertEquals(fetch_resp.error, 0) + + messages = list(fetch_resp.messages) + self.assertEquals(len(messages), 2) + self.assertEquals(messages[0].offset, 0) + self.assertEquals(messages[0].message.value, "Just a test message") + self.assertEquals(messages[0].message.key, None) + self.assertEquals(messages[1].offset, 1) + self.assertEquals(messages[1].message.value, "Message with a key") + self.assertEquals(messages[1].message.key, "foo") + + def test_produce_consume_many(self): + produce = ProduceRequest("test_produce_consume_many", 0, messages=[ + KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + # 1024 is not enough for 100 messages... + fetch1 = FetchRequest("test_produce_consume_many", 0, 0, 1024) + + (fetch_resp1,) = self.client.send_fetch_request([fetch1]) + + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 100) + messages = list(fetch_resp1.messages) + self.assertTrue(len(messages) < 100) + + # 10240 should be enough + fetch2 = FetchRequest("test_produce_consume_many", 0, 0, 10240) + (fetch_resp2,) = self.client.send_fetch_request([fetch2]) + + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 100) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 100) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Test message %d" % i) + self.assertEquals(message.message.key, None) + + def test_produce_consume_two_partitions(self): + produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ + KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) + ]) + produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ + KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce1, produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + return + + fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) + fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 10) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 0 %d" % i) + self.assertEquals(message.message.key, None) + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 10) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 10) + for i, message in enumerate(messages): + self.assertEquals(message.offset, i) + self.assertEquals(message.message.value, "Partition 1 %d" % i) + self.assertEquals(message.message.key, None) + + #################### + # Offset Tests # + #################### + + def test_commit_fetch_offsets(self): + req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") + (resp,) = self.client.send_offset_commit_request("group", [req]) + self.assertEquals(resp.error, 0) + + req = OffsetFetchRequest("test_commit_fetch_offsets", 0) + (resp,) = self.client.send_offset_fetch_request("group", [req]) + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 42) + self.assertEquals(resp.metadata, "metadata") + + + - # Shutdown the queue - q.close() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) -- cgit v1.2.1 From eac51e9c68c50f15962b6c785ede92cb3d512a17 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 1 Mar 2013 15:12:21 -0500 Subject: Integration tests passing --- test/integration.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 598b17a..9fa8538 100644 --- a/test/integration.py +++ b/test/integration.py @@ -10,8 +10,9 @@ import tempfile from threading import Thread, Event import time import unittest +from urlparse import urlparse -from kafka.client08 import * +from kafka.client import * def get_open_port(): sock = socket.socket() @@ -49,16 +50,26 @@ class KafkaFixture(Thread): stdout = open(os.path.join(logDir, 'stdout'), 'w') # Create the config file + zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_") logConfig = "test/resources/log4j.properties" configFile = os.path.join(self.tmpDir, 'server.properties') f = open('test/resources/server.properties', 'r') props = f.read() f = open(configFile, 'w') - f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2}) + f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot}) f.close() + cp = build_kafka_classpath() + + # Create the Zookeeper chroot + args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot)) + proc = subprocess.Popen(args) + ret = proc.wait() + assert ret == 0 + + # Start Kafka - args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, build_kafka_classpath(), configFile)) + args = shlex.split("java -Xmx256M -server -Dlog4j.configuration=%s -cp %s kafka.Kafka %s" % (logConfig, cp, configFile)) proc = subprocess.Popen(args, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env={"JMX_PORT":"%d" % get_open_port()}) killed = False @@ -334,11 +345,7 @@ class TestKafkaClient(unittest.TestCase): (resp,) = self.client.send_offset_fetch_request("group", [req]) self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 42) - self.assertEquals(resp.metadata, "metadata") - - - - + self.assertEquals(resp.metadata, "") # Metadata isn't stored for now if __name__ == "__main__": logging.basicConfig(level=logging.INFO) -- cgit v1.2.1 From 1b721325fe6b170cdfe001749dbd7b750fe59512 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 8 Mar 2013 19:56:51 -0500 Subject: Started on a simple producer and consumer --- test/integration.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 9fa8538..91917e6 100644 --- a/test/integration.py +++ b/test/integration.py @@ -310,7 +310,6 @@ class TestKafkaClient(unittest.TestCase): for resp in self.client.send_produce_request([produce1, produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - return fetch1 = FetchRequest("test_produce_consume_two_partitions", 0, 0, 1024) fetch2 = FetchRequest("test_produce_consume_two_partitions", 1, 0, 1024) @@ -347,6 +346,32 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(resp.offset, 42) self.assertEquals(resp.metadata, "") # Metadata isn't stored for now + # Producer Tests + + def test_simple_producer(self): + producer = SimpleProducer(self.client, "test_simple_producer") + producer.send_message("one") + producer.send_message("two") + + fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) + fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) + fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) + self.assertEquals(fetch_resp1.error, 0) + self.assertEquals(fetch_resp1.highwaterMark, 1) + messages = list(fetch_resp1.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "one") + self.assertEquals(fetch_resp2.error, 0) + self.assertEquals(fetch_resp2.highwaterMark, 1) + messages = list(fetch_resp2.messages) + self.assertEquals(len(messages), 1) + self.assertEquals(messages[0].message.value, "two") + + # Consumer Tests + + def test_consumer(self): + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + if __name__ == "__main__": logging.basicConfig(level=logging.INFO) unittest.main() -- cgit v1.2.1 From 3499e2f6ead76e1c2db6ac754358bd57f9a15268 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 19 Mar 2013 14:03:51 -0400 Subject: Some work on a simple consumer --- test/integration.py | 64 ++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 6 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 91917e6..0f4d9f1 100644 --- a/test/integration.py +++ b/test/integration.py @@ -35,8 +35,10 @@ def build_kafka_classpath(): return cp class KafkaFixture(Thread): - def __init__(self, host, port): + def __init__(self, host, port, broker_id, zk_chroot=None): Thread.__init__(self) + self.broker_id = broker_id + self.zk_chroot = zk_chroot self.port = port self.capture = "" self.shouldDie = Event() @@ -50,19 +52,24 @@ class KafkaFixture(Thread): stdout = open(os.path.join(logDir, 'stdout'), 'w') # Create the config file - zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_") + if self.zk_chroot is None: + self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_") logConfig = "test/resources/log4j.properties" configFile = os.path.join(self.tmpDir, 'server.properties') f = open('test/resources/server.properties', 'r') props = f.read() f = open(configFile, 'w') - f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot}) + f.write(props % {'broker.id': self.broker_id, + 'kafka.port': self.port, + 'kafka.tmp.dir': logDir, + 'kafka.partitions': 2, + 'zk.chroot': self.zk_chroot}) f.close() cp = build_kafka_classpath() # Create the Zookeeper chroot - args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot)) + args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot)) proc = subprocess.Popen(args) ret = proc.wait() assert ret == 0 @@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase): cls.client = KafkaClient(host, port) else: port = get_open_port() - cls.server = KafkaFixture("localhost", port) + cls.server = KafkaFixture("localhost", port, 0) cls.server.start() cls.server.wait_for("Kafka server started") cls.client = KafkaClient("localhost", port) @@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "two") - # Consumer Tests +class TestConsumer(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Broker 0 + port = get_open_port() + cls.server1 = KafkaFixture("localhost", port, 0) + cls.server1.start() + cls.server1.wait_for("Kafka server started") + + # Broker 1 + zk = cls.server1.zk_chroot + port = get_open_port() + cls.server2 = KafkaFixture("localhost", port, 1, zk) + cls.server2.start() + cls.server2.wait_for("Kafka server started") + + # Client bootstraps from broker 1 + cls.client = KafkaClient("localhost", port) + + @classmethod + def tearDownClass(cls): + cls.client.close() + cls.server1.close() + cls.server2.close() def test_consumer(self): + produce1 = ProduceRequest("test_consumer", 0, messages=[ + KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) + ]) + + produce2 = ProduceRequest("test_consumer", 1, messages=[ + KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes if __name__ == "__main__": logging.basicConfig(level=logging.INFO) -- cgit v1.2.1 From b6d98c07b418b16061ae92392947d5dd6958a708 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Sat, 30 Mar 2013 00:28:00 -0400 Subject: Big code re-org --- test/integration.py | 65 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 22 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 0f4d9f1..b7ad056 100644 --- a/test/integration.py +++ b/test/integration.py @@ -12,7 +12,8 @@ import time import unittest from urlparse import urlparse -from kafka.client import * +from kafka import * +from kafka.common import * def get_open_port(): sock = socket.socket() @@ -146,7 +147,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_many_simple(self): produce = ProduceRequest("test_produce_many_simple", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + create_message("Test message %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce]): @@ -172,7 +173,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_10k_simple(self): produce = ProduceRequest("test_produce_10k_simple", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(10000) + create_message("Test message %d" % i) for i in range(10000) ]) for resp in self.client.send_produce_request([produce]): @@ -183,8 +184,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 10000) def test_produce_many_gzip(self): - message1 = KafkaProtocol.create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) - message2 = KafkaProtocol.create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) + message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_many_gzip", 0, messages=[message1, message2]) @@ -196,8 +197,8 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 200) def test_produce_many_snappy(self): - message1 = KafkaProtocol.create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) - message2 = KafkaProtocol.create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) + message1 = create_snappy_message(["Snappy 1 %d" % i for i in range(100)]) + message2 = create_snappy_message(["Snappy 2 %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_many_snappy", 0, messages=[message1, message2]) @@ -209,9 +210,9 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(offset.offsets[0], 200) def test_produce_mixed(self): - message1 = KafkaProtocol.create_message("Just a plain message") - message2 = KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100)]) - message3 = KafkaProtocol.create_snappy_message(["Snappy %d" % i for i in range(100)]) + message1 = create_message("Just a plain message") + message2 = create_gzip_message(["Gzipped %d" % i for i in range(100)]) + message3 = create_snappy_message(["Snappy %d" % i for i in range(100)]) produce = ProduceRequest("test_produce_mixed", 0, messages=[message1, message2, message3]) @@ -225,7 +226,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_100k_gzipped(self): produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ - KafkaProtocol.create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + create_gzip_message(["Gzipped %d" % i for i in range(100000)]) ]) for resp in self.client.send_produce_request([produce]): @@ -252,8 +253,8 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume(self): produce = ProduceRequest("test_produce_consume", 0, messages=[ - KafkaProtocol.create_message("Just a test message"), - KafkaProtocol.create_message("Message with a key", "foo"), + create_message("Just a test message"), + create_message("Message with a key", "foo"), ]) for resp in self.client.send_produce_request([produce]): @@ -276,7 +277,7 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume_many(self): produce = ProduceRequest("test_produce_consume_many", 0, messages=[ - KafkaProtocol.create_message("Test message %d" % i) for i in range(100) + create_message("Test message %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce]): @@ -308,10 +309,10 @@ class TestKafkaClient(unittest.TestCase): def test_produce_consume_two_partitions(self): produce1 = ProduceRequest("test_produce_consume_two_partitions", 0, messages=[ - KafkaProtocol.create_message("Partition 0 %d" % i) for i in range(10) + create_message("Partition 0 %d" % i) for i in range(10) ]) produce2 = ProduceRequest("test_produce_consume_two_partitions", 1, messages=[ - KafkaProtocol.create_message("Partition 1 %d" % i) for i in range(10) + create_message("Partition 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1, produce2]): @@ -400,22 +401,25 @@ class TestConsumer(unittest.TestCase): cls.server2.close() def test_consumer(self): + # Produce 100 messages to partition 0 produce1 = ProduceRequest("test_consumer", 0, messages=[ - KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) - ]) - - produce2 = ProduceRequest("test_consumer", 1, messages=[ - KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100) + create_message("Test message 0 %d" % i) for i in range(100) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + # Produce 100 messages to partition 1 + produce2 = ProduceRequest("test_consumer", 1, messages=[ + create_message("Test message 1 %d" % i) for i in range(100) + ]) + for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + # Start a consumer consumer = SimpleConsumer(self.client, "group1", "test_consumer") all_messages = [] for message in consumer: @@ -424,6 +428,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes + # Produce more messages + produce3 = ProduceRequest("test_consumer", 1, messages=[ + create_message("Test message 3 %d" % i) for i in range(10) + ]) + + for resp in self.client.send_produce_request([produce3]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 100) + + # Start a new consumer, make sure we only get the newly produced messages + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + + all_messages = [] + for message in consumer: + all_messages.append(message) + self.assertEquals(len(all_messages), 10) + if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) unittest.main() -- cgit v1.2.1 From 0678a452ca7ad5fba8e947cbfcf8fcb0f87b902c Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 1 Apr 2013 14:56:59 -0400 Subject: Refactoring a bit, cleanup for 0.8 Marking some stuff as not compatible for 0.8 (will be added in 0.8.1) --- test/integration.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index b7ad056..e51b398 100644 --- a/test/integration.py +++ b/test/integration.py @@ -243,7 +243,7 @@ class TestKafkaClient(unittest.TestCase): def test_consume_none(self): fetch = FetchRequest("test_consume_none", 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]).next() + fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) self.assertEquals(fetch_resp.topic, "test_consume_none") self.assertEquals(fetch_resp.partition, 0) @@ -263,7 +263,7 @@ class TestKafkaClient(unittest.TestCase): fetch = FetchRequest("test_produce_consume", 0, 0, 1024) - fetch_resp = self.client.send_fetch_request([fetch]).next() + fetch_resp = self.client.send_fetch_request([fetch])[0] self.assertEquals(fetch_resp.error, 0) messages = list(fetch_resp.messages) @@ -343,6 +343,7 @@ class TestKafkaClient(unittest.TestCase): # Offset Tests # #################### + @unittest.skip("No supported until 0.8.1") def test_commit_fetch_offsets(self): req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) @@ -428,22 +429,20 @@ class TestConsumer(unittest.TestCase): self.assertEquals(len(all_messages), 200) self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes - # Produce more messages - produce3 = ProduceRequest("test_consumer", 1, messages=[ - create_message("Test message 3 %d" % i) for i in range(10) - ]) - - for resp in self.client.send_produce_request([produce3]): - self.assertEquals(resp.error, 0) - self.assertEquals(resp.offset, 100) + consumer.seek(-10, 2) + all_messages = [] + for message in consumer: + all_messages.append(message) - # Start a new consumer, make sure we only get the newly produced messages - consumer = SimpleConsumer(self.client, "group1", "test_consumer") + self.assertEquals(len(all_messages), 10) + consumer.seek(-13, 2) all_messages = [] for message in consumer: all_messages.append(message) - self.assertEquals(len(all_messages), 10) + + self.assertEquals(len(all_messages), 13) + if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) -- cgit v1.2.1 From 864430c8c8ef0c1d05f7e62451320919b3be8eb7 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 1 Apr 2013 21:09:48 -0400 Subject: Bring acks and timeout down to the client --- test/integration.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index e51b398..04d5979 100644 --- a/test/integration.py +++ b/test/integration.py @@ -359,22 +359,23 @@ class TestKafkaClient(unittest.TestCase): def test_simple_producer(self): producer = SimpleProducer(self.client, "test_simple_producer") - producer.send_message("one") - producer.send_message("two") + producer.send_messages("one", "two") + producer.send_messages("three") fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) - self.assertEquals(fetch_resp1.highwaterMark, 1) + self.assertEquals(fetch_resp1.highwaterMark, 2) messages = list(fetch_resp1.messages) - self.assertEquals(len(messages), 1) + self.assertEquals(len(messages), 2) self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "two") self.assertEquals(fetch_resp2.error, 0) self.assertEquals(fetch_resp2.highwaterMark, 1) messages = list(fetch_resp2.messages) self.assertEquals(len(messages), 1) - self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[0].message.value, "three") class TestConsumer(unittest.TestCase): @classmethod -- cgit v1.2.1 From 1c5081e6b3237067308b5819a121e420e45d56b0 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 1 Apr 2013 22:06:20 -0400 Subject: Update kafka-src to latest 0.8 Fix a broken test (100k was too much to send in one batch) --- test/integration.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index 04d5979..a1fcce7 100644 --- a/test/integration.py +++ b/test/integration.py @@ -225,14 +225,25 @@ class TestKafkaClient(unittest.TestCase): def test_produce_100k_gzipped(self): - produce = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ - create_gzip_message(["Gzipped %d" % i for i in range(100000)]) + req1 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) ]) - for resp in self.client.send_produce_request([produce]): + for resp in self.client.send_produce_request([req1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) + self.assertEquals(offset.offsets[0], 50000) + + req2 = ProduceRequest("test_produce_100k_gzipped", 0, messages=[ + create_gzip_message(["Gzipped batch 2, message %d" % i for i in range(50000)]) + ]) + + for resp in self.client.send_produce_request([req2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 50000) + (offset, ) = self.client.send_offset_request([OffsetRequest("test_produce_100k_gzipped", 0, -1, 1)]) self.assertEquals(offset.offsets[0], 100000) -- cgit v1.2.1 From c275c023c5958c3b2b7d1aaf8642f2475d7780c8 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Apr 2013 19:55:51 -0400 Subject: Update kafka-src to latest trunk, enable 0.8.1 features --- test/integration.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'test/integration.py') diff --git a/test/integration.py b/test/integration.py index a1fcce7..609cfc6 100644 --- a/test/integration.py +++ b/test/integration.py @@ -6,6 +6,7 @@ import shlex import shutil import socket import subprocess +import sys import tempfile from threading import Thread, Event import time @@ -73,7 +74,8 @@ class KafkaFixture(Thread): args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot)) proc = subprocess.Popen(args) ret = proc.wait() - assert ret == 0 + if ret != 0: + sys.exit(1) # Start Kafka @@ -354,7 +356,6 @@ class TestKafkaClient(unittest.TestCase): # Offset Tests # #################### - @unittest.skip("No supported until 0.8.1") def test_commit_fetch_offsets(self): req = OffsetCommitRequest("test_commit_fetch_offsets", 0, 42, "metadata") (resp,) = self.client.send_offset_commit_request("group", [req]) -- cgit v1.2.1