summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-19 14:03:51 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commit3499e2f6ead76e1c2db6ac754358bd57f9a15268 (patch)
tree8d7f827f2caabedadb68ad37cee4dcc908d9cd11
parent1b721325fe6b170cdfe001749dbd7b750fe59512 (diff)
downloadkafka-python-3499e2f6ead76e1c2db6ac754358bd57f9a15268.tar.gz
Some work on a simple consumer
-rw-r--r--kafka/client.py36
-rw-r--r--test/integration.py64
-rw-r--r--test/resources/server.properties2
3 files changed, 91 insertions, 11 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 3f1fa39..8dfc4f0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -685,7 +685,7 @@ class KafkaClient(object):
for produce_response in KafkaProtocol.decode_produce_response(response):
# Check for errors
if fail_on_error == True and produce_response.error != ErrorMapping.NO_ERROR:
- raise Exception("ProduceRequest for %s failed with errorcode=%d",
+ raise Exception("ProduceRequest for %s failed with errorcode=%d" %
(TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error))
# Run the callback
if callback is not None:
@@ -825,6 +825,9 @@ class SimpleProducer(object):
resp = self.client.send_produce_request([req]).next()
class SimpleConsumer(object):
+ """
+ A simple consumer implementation that consumes all partitions for a topic
+ """
def __init__(self, client, group, topic):
self.client = client
self.topic = topic
@@ -832,7 +835,7 @@ class SimpleConsumer(object):
self.client.load_metadata_for_topics(topic)
self.offsets = {}
- def get_or_init_offset(resp):
+ def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
return resp.offset
elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON:
@@ -843,8 +846,33 @@ class SimpleConsumer(object):
for partition in self.client.topic_partitions[topic]:
req = OffsetFetchRequest(topic, partition)
- (offset,) = self.client.send_offset_fetch_request(group, [req], callback=get_or_init_offset, fail_on_error=False)
+ (offset,) = self.client.send_offset_fetch_request(group, [req],
+ callback=get_or_init_offset_callback, fail_on_error=False)
self.offsets[partition] = offset
- print self.offsets
+ def __iter__(self):
+ iters = {}
+ for partition, offset in self.offsets.items():
+ iters[partition] = self.__iter_partition__(partition, offset)
+
+ while True:
+ for it in iters.values():
+ yield it.next()
+
+ def __iter_partition__(self, partition, offset):
+ while True:
+ req = FetchRequest(self.topic, partition, offset, 1024)
+ (resp,) = self.client.send_fetch_request([req])
+ assert resp.topic == self.topic
+ assert resp.partition == partition
+ next_offset = None
+ for message in resp.messages:
+ next_offset = message.offset
+ yield message
+ if next_offset is None:
+ raise StopIteration("No more messages")
+ else:
+ offset = next_offset + 1
+ # Commit offset here?
+
diff --git a/test/integration.py b/test/integration.py
index 91917e6..0f4d9f1 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -35,8 +35,10 @@ def build_kafka_classpath():
return cp
class KafkaFixture(Thread):
- def __init__(self, host, port):
+ def __init__(self, host, port, broker_id, zk_chroot=None):
Thread.__init__(self)
+ self.broker_id = broker_id
+ self.zk_chroot = zk_chroot
self.port = port
self.capture = ""
self.shouldDie = Event()
@@ -50,19 +52,24 @@ class KafkaFixture(Thread):
stdout = open(os.path.join(logDir, 'stdout'), 'w')
# Create the config file
- zkChroot = "kafka-python_%s" % self.tmpDir.replace("/", "_")
+ if self.zk_chroot is None:
+ self.zk_chroot= "kafka-python_%s" % self.tmpDir.replace("/", "_")
logConfig = "test/resources/log4j.properties"
configFile = os.path.join(self.tmpDir, 'server.properties')
f = open('test/resources/server.properties', 'r')
props = f.read()
f = open(configFile, 'w')
- f.write(props % {'kafka.port': self.port, 'kafka.tmp.dir': logDir, 'kafka.partitions': 2, 'zk.chroot': zkChroot})
+ f.write(props % {'broker.id': self.broker_id,
+ 'kafka.port': self.port,
+ 'kafka.tmp.dir': logDir,
+ 'kafka.partitions': 2,
+ 'zk.chroot': self.zk_chroot})
f.close()
cp = build_kafka_classpath()
# Create the Zookeeper chroot
- args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, zkChroot))
+ args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
proc = subprocess.Popen(args)
ret = proc.wait()
assert ret == 0
@@ -123,7 +130,7 @@ class TestKafkaClient(unittest.TestCase):
cls.client = KafkaClient(host, port)
else:
port = get_open_port()
- cls.server = KafkaFixture("localhost", port)
+ cls.server = KafkaFixture("localhost", port, 0)
cls.server.start()
cls.server.wait_for("Kafka server started")
cls.client = KafkaClient("localhost", port)
@@ -367,10 +374,55 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "two")
- # Consumer Tests
+class TestConsumer(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # Broker 0
+ port = get_open_port()
+ cls.server1 = KafkaFixture("localhost", port, 0)
+ cls.server1.start()
+ cls.server1.wait_for("Kafka server started")
+
+ # Broker 1
+ zk = cls.server1.zk_chroot
+ port = get_open_port()
+ cls.server2 = KafkaFixture("localhost", port, 1, zk)
+ cls.server2.start()
+ cls.server2.wait_for("Kafka server started")
+
+ # Client bootstraps from broker 1
+ cls.client = KafkaClient("localhost", port)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.client.close()
+ cls.server1.close()
+ cls.server2.close()
def test_consumer(self):
+ produce1 = ProduceRequest("test_consumer", 0, messages=[
+ KafkaProtocol.create_message("Test message 0 %d" % i) for i in range(100)
+ ])
+
+ produce2 = ProduceRequest("test_consumer", 1, messages=[
+ KafkaProtocol.create_message("Test message 1 %d" % i) for i in range(100)
+ ])
+
+ for resp in self.client.send_produce_request([produce1]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
+ for resp in self.client.send_produce_request([produce2]):
+ self.assertEquals(resp.error, 0)
+ self.assertEquals(resp.offset, 0)
+
consumer = SimpleConsumer(self.client, "group1", "test_consumer")
+ all_messages = []
+ for message in consumer:
+ all_messages.append(message)
+
+ self.assertEquals(len(all_messages), 200)
+ self.assertEquals(len(all_messages), len(set(all_messages))) # make sure there are no dupes
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
diff --git a/test/resources/server.properties b/test/resources/server.properties
index 85bee88..88a6e84 100644
--- a/test/resources/server.properties
+++ b/test/resources/server.properties
@@ -17,7 +17,7 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=0
+broker.id=%(broker.id)d
############################# Socket Server Settings #############################