summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-08 23:00:50 -0700
committerDana Powers <dana.powers@rd.io>2015-03-29 16:36:36 -0700
commit484f1a4722145620c0dcd0cb8c72bcb8a3834020 (patch)
treee64edacdc937a27653a34af088e09f8c516b45c1 /docs
parenteb5fd4aaa1b0f099235ad29784e3068e3f29e131 (diff)
downloadkafka-python-484f1a4722145620c0dcd0cb8c72bcb8a3834020.tar.gz
Move KafkaConsumer usage examples to docs/usage; Put KeyedProducer usage right after SimpleProducer
Diffstat (limited to 'docs')
-rw-r--r--docs/usage.rst119
1 files changed, 105 insertions, 14 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index 141cf93..acd52dc 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -1,12 +1,12 @@
Usage
=====
-High level
-----------
+SimpleProducer
+--------------
.. code:: python
- from kafka import SimpleProducer, KafkaClient, KafkaConsumer
+ from kafka import SimpleProducer, KafkaClient
# To send messages synchronously
kafka = KafkaClient("localhost:9092")
@@ -51,17 +51,6 @@ High level
batch_send_every_n=20,
batch_send_every_t=60)
- # To consume messages
- consumer = KafkaConsumer("my-topic", group_id="my_group",
- metadata_broker_list=["localhost:9092"])
- for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
-
- kafka.close()
-
-
Keyed messages
--------------
@@ -80,6 +69,108 @@ Keyed messages
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+KafkaConsumer
+-------------
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # To consume messages
+ consumer = KafkaConsumer("my-topic",
+ group_id="my_group",
+ metadata_broker_list=["localhost:9092"])
+ for message in consumer:
+ # message is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.decode('utf-8')`
+ print(message)
+
+ kafka.close()
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # A very basic 'tail' consumer, with no stored offset management
+ kafka = KafkaConsumer('topic1',
+ metadata_broker_list=['localhost:9092'])
+ for m in kafka:
+ print m
+
+ # Alternate interface: next()
+ print kafka.next()
+
+ # Alternate interface: batch iteration
+ while True:
+ for m in kafka.fetch_messages():
+ print m
+ print "Done with batch - let's do another!"
+
+
+.. code:: python
+
+ from kafka import KafkaConsumer
+
+ # more advanced consumer -- multiple topics w/ auto commit offset
+ # management
+ kafka = KafkaConsumer('topic1', 'topic2',
+ metadata_broker_list=['localhost:9092'],
+ group_id='my_consumer_group',
+ auto_commit_enable=True,
+ auto_commit_interval_ms=30 * 1000,
+ auto_offset_reset='smallest')
+
+ # Infinite iteration
+ for m in kafka:
+ process_message(m)
+ kafka.task_done(m)
+
+ # Alternate interface: next()
+ m = kafka.next()
+ process_message(m)
+ kafka.task_done(m)
+
+ # If auto_commit_enable is False, remember to commit() periodically
+ kafka.commit()
+
+ # Batch process interface
+ while True:
+ for m in kafka.fetch_messages():
+ process_message(m)
+ kafka.task_done(m)
+
+
+ messages (m) are namedtuples with attributes:
+
+ * `m.topic`: topic name (str)
+ * `m.partition`: partition number (int)
+ * `m.offset`: message offset on topic-partition log (int)
+ * `m.key`: key (bytes - can be None)
+ * `m.value`: message (output of deserializer_class - default is raw bytes)
+
+ Configuration settings can be passed to constructor,
+ otherwise defaults will be used:
+
+.. code:: python
+
+ client_id='kafka.consumer.kafka',
+ group_id=None,
+ fetch_message_max_bytes=1024*1024,
+ fetch_min_bytes=1,
+ fetch_wait_max_ms=100,
+ refresh_leader_backoff_ms=200,
+ metadata_broker_list=None,
+ socket_timeout_ms=30*1000,
+ auto_offset_reset='largest',
+ deserializer_class=lambda msg: msg,
+ auto_commit_enable=False,
+ auto_commit_interval_ms=60 * 1000,
+ consumer_timeout_ms=-1
+
+ Configuration parameters are described in more detail at
+ http://kafka.apache.org/documentation.html#highlevelconsumerapi
+
Multiprocess consumer
---------------------