diff options
author | Dana Powers <dana.powers@rd.io> | 2015-03-08 23:00:50 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-03-29 16:36:36 -0700 |
commit | 484f1a4722145620c0dcd0cb8c72bcb8a3834020 (patch) | |
tree | e64edacdc937a27653a34af088e09f8c516b45c1 /docs | |
parent | eb5fd4aaa1b0f099235ad29784e3068e3f29e131 (diff) | |
download | kafka-python-484f1a4722145620c0dcd0cb8c72bcb8a3834020.tar.gz |
Move KafkaConsumer usage examples to docs/usage; Put KeyedProducer usage right after SimpleProducer
Diffstat (limited to 'docs')
-rw-r--r-- | docs/usage.rst | 119 |
1 files changed, 105 insertions, 14 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index 141cf93..acd52dc 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,12 +1,12 @@ Usage ===== -High level ----------- +SimpleProducer +-------------- .. code:: python - from kafka import SimpleProducer, KafkaClient, KafkaConsumer + from kafka import SimpleProducer, KafkaClient # To send messages synchronously kafka = KafkaClient("localhost:9092") @@ -51,17 +51,6 @@ High level batch_send_every_n=20, batch_send_every_t=60) - # To consume messages - consumer = KafkaConsumer("my-topic", group_id="my_group", - metadata_broker_list=["localhost:9092"]) - for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - - kafka.close() - - Keyed messages -------------- @@ -80,6 +69,108 @@ Keyed messages producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + +KafkaConsumer +------------- + +.. code:: python + + from kafka import KafkaConsumer + + # To consume messages + consumer = KafkaConsumer("my-topic", + group_id="my_group", + metadata_broker_list=["localhost:9092"]) + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + kafka.close() + +.. code:: python + + from kafka import KafkaConsumer + + # A very basic 'tail' consumer, with no stored offset management + kafka = KafkaConsumer('topic1', + metadata_broker_list=['localhost:9092']) + for m in kafka: + print m + + # Alternate interface: next() + print kafka.next() + + # Alternate interface: batch iteration + while True: + for m in kafka.fetch_messages(): + print m + print "Done with batch - let's do another!" + + +.. code:: python + + from kafka import KafkaConsumer + + # more advanced consumer -- multiple topics w/ auto commit offset + # management + kafka = KafkaConsumer('topic1', 'topic2', + metadata_broker_list=['localhost:9092'], + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in kafka: + process_message(m) + kafka.task_done(m) + + # Alternate interface: next() + m = kafka.next() + process_message(m) + kafka.task_done(m) + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + + + messages (m) are namedtuples with attributes: + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) + + Configuration settings can be passed to constructor, + otherwise defaults will be used: + +.. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + metadata_broker_list=None, + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 + + Configuration parameters are described in more detail at + http://kafka.apache.org/documentation.html#highlevelconsumerapi + Multiprocess consumer --------------------- |