summaryrefslogtreecommitdiff
path: root/docs/usage.rst
diff options
context:
space:
mode:
Diffstat (limited to 'docs/usage.rst')
-rw-r--r--docs/usage.rst122
1 files changed, 122 insertions, 0 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
new file mode 100644
index 0000000..5f3fcea
--- /dev/null
+++ b/docs/usage.rst
@@ -0,0 +1,122 @@
+Usage
+=====
+
+High level
+----------
+
+.. code:: python
+
+ from kafka import KafkaClient, SimpleProducer, SimpleConsumer
+
+ # To send messages synchronously
+ kafka = KafkaClient("localhost:9092")
+ producer = SimpleProducer(kafka)
+
+ # Note that the application is responsible for encoding messages to type str
+ producer.send_messages("my-topic", "some message")
+ producer.send_messages("my-topic", "this method", "is variadic")
+
+ # Send unicode message
+ producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))
+
+ # To send messages asynchronously
+ # WARNING: current implementation does not guarantee message delivery on failure!
+ # messages can get dropped! Use at your own risk! Or help us improve with a PR!
+ producer = SimpleProducer(kafka, async=True)
+ producer.send_messages("my-topic", "async message")
+
+ # To wait for acknowledgements
+ # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
+ # a local log before sending response
+ # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+ # by all in sync replicas before sending a response
+ producer = SimpleProducer(kafka, async=False,
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=2000)
+
+ response = producer.send_messages("my-topic", "another message")
+
+ if response:
+ print(response[0].error)
+ print(response[0].offset)
+
+ # To send messages in batch. You can use any of the available
+ # producers for doing this. The following producer will collect
+ # messages in batch and send them to Kafka after 20 messages are
+ # collected or every 60 seconds
+ # Notes:
+ # * If the producer dies before the messages are sent, there will be losses
+ # * Call producer.stop() to send the messages and cleanup
+ producer = SimpleProducer(kafka, batch_send=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+ # To consume messages
+ consumer = SimpleConsumer(kafka, "my-group", "my-topic")
+ for message in consumer:
+ # message is raw byte string -- decode if necessary!
+ # e.g., for unicode: `message.decode('utf-8')`
+ print(message)
+
+ kafka.close()
+
+
+Keyed messages
+--------------
+
+.. code:: python
+
+ from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner
+
+ kafka = KafkaClient("localhost:9092")
+
+ # HashedPartitioner is default
+ producer = KeyedProducer(kafka)
+ producer.send("my-topic", "key1", "some message")
+ producer.send("my-topic", "key2", "this methode")
+
+ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+
+
+Multiprocess consumer
+---------------------
+
+.. code:: python
+
+ from kafka import KafkaClient, MultiProcessConsumer
+
+ kafka = KafkaClient("localhost:9092")
+
+ # This will split the number of partitions among two processes
+ consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
+
+ # This will spawn processes such that each handles 2 partitions max
+ consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
+ partitions_per_proc=2)
+
+ for message in consumer:
+ print(message)
+
+ for message in consumer.get_messages(count=5, block=True, timeout=4):
+ print(message)
+
+Low level
+---------
+
+.. code:: python
+
+ from kafka import KafkaClient, create_message
+ from kafka.protocol import KafkaProtocol
+ from kafka.common import ProduceRequest
+
+ kafka = KafkaClient("localhost:9092")
+
+ req = ProduceRequest(topic="my-topic", partition=1,
+ messages=[create_message("some message")])
+ resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
+ kafka.close()
+
+ resps[0].topic # "my-topic"
+ resps[0].partition # 1
+ resps[0].error # 0 (hopefully)
+ resps[0].offset # offset of the first message sent in this request