summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorJesse Myers <jesse@locationlabs.com>2014-08-30 20:43:18 -0700
committerJesse Myers <jesse@locationlabs.com>2014-08-30 20:43:18 -0700
commitd65bc5b70dfa381f650dc4c1e136680b8f6c9649 (patch)
treecdf4dc7e9214696c07319c7efd68f7e7b79f9262 /kafka
parent63e08e643b5d02aacbccab21b72b34ff1798a23a (diff)
downloadkafka-python-d65bc5b70dfa381f650dc4c1e136680b8f6c9649.tar.gz
Improve documentation in example
Diffstat (limited to 'kafka')
-rw-r--r--kafka/transaction.py3
1 files changed, 2 insertions, 1 deletions
diff --git a/kafka/transaction.py b/kafka/transaction.py
index 10c2ebd..0dfe9d4 100644
--- a/kafka/transaction.py
+++ b/kafka/transaction.py
@@ -20,13 +20,14 @@ class KafkaTransaction(object):
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
+ consumer.fetch_last_known_offsets()
while some_condition:
with KafkaTransaction(consumer) as transaction:
messages = consumer.get_messages(count, block=False)
for partition, message in messages:
- if can_process(message.value):
+ if can_process(message):
transaction.mark(partition, message.offset)
else:
break