From 3499e2f6ead76e1c2db6ac754358bd57f9a15268 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 19 Mar 2013 14:03:51 -0400 Subject: Some work on a simple consumer --- kafka/client.py | 36 +++++++++++++++++++--- test/integration.py | 64 ++++++++++++++++++++++++++++++++++++---- test/resources/server.properties | 2 +- 3 files changed, 91 insertions(+), 11 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 3f1fa39..8dfc4f0 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -685,7 +685,7 @@ class KafkaClient(object): for produce_response in KafkaProtocol.decode_produce_response(response): # Check for errors if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d", + raise Exception("ProduceRequest for %s failed with errorcode=%d" % (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: @@ -825,6 +825,9 @@ class SimpleProducer(object): resp = self.client.send_produce_request([req]).next() class SimpleConsumer(object): + """ + A simple consumer implementation that consumes all partitions for a topic + """ def __init__(self, client, group, topic): self.client = client self.topic = topic @@ -832,7 +835,7 @@ class SimpleConsumer(object): self.client.load_metadata_for_topics(topic) self.offsets = {} - def get_or_init_offset(resp): + def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: return resp.offset elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: @@ -843,8 +846,33 @@ class SimpleConsumer(object): for partition in self.client.topic_partitions[topic]: req = OffsetFetchRequest(topic, partition) - (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False) + (offset,) = self.client.send_offset_fetch_request(group, [req], + callback=get_or_init_offset_callback, fail_on_error=False) self.offsets[partition] = offset - print self.offsets + def __iter__(self): + iters = {} + for partition, offset in self.offsets.items(): + iters[partition] = self.__iter_partition__(partition, offset) + + while True: + for it in iters.values(): + yield it.next() + + def __iter_partition__(self, partition, offset): + while True: + req = FetchRequest(self.topic, partition, offset, 1024) + (resp,) = self.client.send_fetch_request([req]) + assert resp.topic == self.topic + assert resp.partition == partition + next_offset = None + for message in resp.messages: + next_offset = message.offset + yield message + if next_offset is None: + raise StopIteration("No more messages") + else: + offset = next_offset + 1 + # Commit offset here? + diff --git a/test/integration.py b/test/integration.py index 91917e6..0f4d9f1 100644 --- a/test/integration.py +++ b/test/integration.py @@ -35,8 +35,10 @@ def build_kafka_classpath(): return cp class KafkaFixture(Thread): - def __init__(self, host, port): + def __init__(self, host, port, broker_id, zk_chroot=None): Thread.__init__(self) + self.broker_id = broker_id + self.zk_chroot = zk_chroot self.port = port self.capture = "" self.shouldDie = Event() @@ -50,19 +52,24 @@ class KafkaFixture(Thread): stdout = open(os.path.join(logDir, 'stdout'), 'w') # Create the config file - zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_") + if self.zk_chroot is None: + self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_") logConfig = "test/resources/log4j.properties" configFile = os.path.join(self.tmpDir, 'server.properties') f = open('test/resources/server.properties', 'r') props = f.read() f = open(configFile, 'w') - f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot}) + f.write(props % {'broker.id': self.broker_id, + 'kafka.port': self.port, + 'kafka.tmp.dir': logDir, + 'kafka.partitions': 2, + 'zk.chroot': self.zk_chroot}) f.close() cp = build_kafka_classpath() # Create the Zookeeper chroot - args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot)) + args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot)) proc = subprocess.Popen(args) ret = proc.wait() assert ret == 0 @@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase): cls.client = KafkaClient(host, port) else: port = get_open_port() - cls.server = KafkaFixture("localhost", port) + cls.server = KafkaFixture("localhost", port, 0) cls.server.start() cls.server.wait_for("Kafka server started") cls.client = KafkaClient("localhost", port) @@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase): self.assertEquals(len(messages), 1) self.assertEquals(messages[0].message.value, "two") - # Consumer Tests +class TestConsumer(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Broker 0 + port = get_open_port() + cls.server1 = KafkaFixture("localhost", port, 0) + cls.server1.start() + cls.server1.wait_for("Kafka server started") + + # Broker 1 + zk = cls.server1.zk_chroot + port = get_open_port() + cls.server2 = KafkaFixture("localhost", port, 1, zk) + cls.server2.start() + cls.server2.wait_for("Kafka server started") + + # Client bootstraps from broker 1 + cls.client = KafkaClient("localhost", port) + + @classmethod + def tearDownClass(cls): + cls.client.close() + cls.server1.close() + cls.server2.close() def test_consumer(self): + produce1 = ProduceRequest("test_consumer", 0, messages=[ + KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100) + ]) + + produce2 = ProduceRequest("test_consumer", 1, messages=[ + KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100) + ]) + + for resp in self.client.send_produce_request([produce1]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + + for resp in self.client.send_produce_request([produce2]): + self.assertEquals(resp.error, 0) + self.assertEquals(resp.offset, 0) + consumer = SimpleConsumer(self.client, "group1", "test_consumer") + all_messages = [] + for message in consumer: + all_messages.append(message) + + self.assertEquals(len(all_messages), 200) + self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes if __name__ == "__main__": logging.basicConfig(level=logging.INFO) diff --git a/test/resources/server.properties b/test/resources/server.properties index 85bee88..88a6e84 100644 --- a/test/resources/server.properties +++ b/test/resources/server.properties @@ -17,7 +17,7 @@ ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. -broker.id=0 +broker.id=%(broker.id)d ############################# Socket Server Settings ############################# -- cgit v1.2.1