diff options
Diffstat (limited to 'docs/usage.rst')
-rw-r--r-- | docs/usage.rst | 122 |
1 files changed, 122 insertions, 0 deletions
diff --git a/docs/usage.rst b/docs/usage.rst new file mode 100644 index 0000000..5f3fcea --- /dev/null +++ b/docs/usage.rst @@ -0,0 +1,122 @@ +Usage +===== + +High level +---------- + +.. code:: python + + from kafka import KafkaClient, SimpleProducer, SimpleConsumer + + # To send messages synchronously + kafka = KafkaClient("localhost:9092") + producer = SimpleProducer(kafka) + + # Note that the application is responsible for encoding messages to type str + producer.send_messages("my-topic", "some message") + producer.send_messages("my-topic", "this method", "is variadic") + + # Send unicode message + producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + + # To send messages asynchronously + # WARNING: current implementation does not guarantee message delivery on failure! + # messages can get dropped! Use at your own risk! Or help us improve with a PR! + producer = SimpleProducer(kafka, async=True) + producer.send_messages("my-topic", "async message") + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(kafka, async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000) + + response = producer.send_messages("my-topic", "another message") + + if response: + print(response[0].error) + print(response[0].offset) + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(kafka, batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + + # To consume messages + consumer = SimpleConsumer(kafka, "my-group", "my-topic") + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + kafka.close() + + +Keyed messages +-------------- + +.. code:: python + + from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner + + kafka = KafkaClient("localhost:9092") + + # HashedPartitioner is default + producer = KeyedProducer(kafka) + producer.send("my-topic", "key1", "some message") + producer.send("my-topic", "key2", "this methode") + + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +Multiprocess consumer +--------------------- + +.. code:: python + + from kafka import KafkaClient, MultiProcessConsumer + + kafka = KafkaClient("localhost:9092") + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + +Low level +--------- + +.. code:: python + + from kafka import KafkaClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = KafkaClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request |