From a8e423c7e7aa861b2476ed760dada26a024fdacf Mon Sep 17 00:00:00 2001 From: Mahendra M Date: Mon, 27 May 2013 13:42:47 +0530 Subject: Minor bug fixes * When you initiate a producer with a non-existant queue, the queue is created. However this partition info is not reflected in KafkaClient() immediately. So, we wait for a second and try loading it again. Without this fix, if we do producer.send_messages() after creating a new queue, the library will throw a StopIteration exception. * In SimpleConsumer(), the defaults are not as mentioned in the comments. Fix this (or do we change the documentation?) * There was a problem with the way the consumer iterator worked. for eg: assume that there were 10 messages in the queue/topic and you iterate over it as - for msg in consumer: print (msg) At the end of this, 'offset' that is saved is 10. So, if you run the above loop again, the last message (10) is repeated. This can be fixed by adjusting the offset counter before fetching the message * Avoid some code repeat in consumer.commit() * Fix a bug in send_offset_commit_request() invocation in consumer.py * Fix missing imports --- kafka/client.py | 6 ++++++ kafka/consumer.py | 41 +++++++++++++++++++++++++++++------------ 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 9893737..eb2c25c 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -65,6 +65,12 @@ class KafkaClient(object): self.brokers.update(brokers) self.topics_to_brokers = {} for topic, partitions in topics.items(): + if not partitions: + log.info("Partition is unassigned, delay for 1s and retry") + time.sleep(1) + self._load_metadata_for_topics(topic) + break + for partition, meta in partitions.items(): if meta.leader == -1: log.info("Partition is unassigned, delay for 1s and retry") diff --git a/kafka/consumer.py b/kafka/consumer.py index 93da316..7f6a6f0 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -7,8 +7,15 @@ from kafka.common import ( OffsetRequest, OffsetFetchRequest, OffsetCommitRequest ) +from kafka.util import ( + ReentrantTimer +) + log = logging.getLogger("kafka") +AUTO_COMMIT_MSG_COUNT = 100 +AUTO_COMMIT_INTERVAL = 5000 + class SimpleConsumer(object): """ A simple consumer implementation that consumes all partitions for a topic @@ -27,7 +34,9 @@ class SimpleConsumer(object): manual call to commit will also reset these triggers """ - def __init__(self, client, group, topic, auto_commit=False, auto_commit_every_n=None, auto_commit_every_t=None): + def __init__(self, client, group, topic, auto_commit=True, + auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, + auto_commit_every_t=AUTO_COMMIT_INTERVAL): self.client = client self.topic = topic self.group = group @@ -151,19 +160,21 @@ class SimpleConsumer(object): with self.commit_lock: reqs = [] if len(partitions) == 0: # commit all partitions - for partition, offset in self.offsets.items(): - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - else: - for partition in partitions: - offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: group=%s, topic=%s, partition=%s" % ( - offset, self.group, self.topic, partition)) - reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.send_offset_commit_request(self.group, reqs) + partitions = self.offsets.keys() + + for partition in partitions: + offset = self.offsets[partition] + log.debug("Commit offset %d in SimpleConsumer: " + "group=%s, topic=%s, partition=%s" % + (offset, self.group, self.topic, partition)) + + reqs.append(OffsetCommitRequest(self.topic, partition, + offset, None)) + + resps = self.client.send_offset_commit_request(self.group, reqs) for resp in resps: assert resp.error == 0 + self.count_since_commit = 0 def __iter__(self): @@ -207,6 +218,12 @@ class SimpleConsumer(object): a batch of messages, yield them one at a time. After a batch is exhausted, start a new batch unless we've reached the end of ths partition. """ + + # Unless it is the first message in the queue, we have to fetch + # the next one + if offset != 0: + offset += 1 + while True: req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size (resp,) = self.client.send_fetch_request([req]) -- cgit v1.2.1