Usage ===== High level ---------- .. code:: python from kafka import KafkaClient, SimpleProducer, SimpleConsumer # To send messages synchronously kafka = KafkaClient("localhost:9092") producer = SimpleProducer(kafka) # Note that the application is responsible for encoding messages to type str producer.send_messages("my-topic", "some message") producer.send_messages("my-topic", "this method", "is variadic") # Send unicode message producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) # To send messages asynchronously # WARNING: current implementation does not guarantee message delivery on failure! # messages can get dropped! Use at your own risk! Or help us improve with a PR! producer = SimpleProducer(kafka, async=True) producer.send_messages("my-topic", "async message") # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # by all in sync replicas before sending a response producer = SimpleProducer(kafka, async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000) response = producer.send_messages("my-topic", "another message") if response: print(response[0].error) print(response[0].offset) # To send messages in batch. You can use any of the available # producers for doing this. The following producer will collect # messages in batch and send them to Kafka after 20 messages are # collected or every 60 seconds # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=20, batch_send_every_t=60) # To consume messages consumer = SimpleConsumer(kafka, "my-group", "my-topic") for message in consumer: # message is raw byte string -- decode if necessary! # e.g., for unicode: `message.decode('utf-8')` print(message) kafka.close() Keyed messages -------------- .. code:: python from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner kafka = KafkaClient("localhost:9092") # HashedPartitioner is default producer = KeyedProducer(kafka) producer.send("my-topic", "key1", "some message") producer.send("my-topic", "key2", "this methode") producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) Multiprocess consumer --------------------- .. code:: python from kafka import KafkaClient, MultiProcessConsumer kafka = KafkaClient("localhost:9092") # This will split the number of partitions among two processes consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) # This will spawn processes such that each handles 2 partitions max consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", partitions_per_proc=2) for message in consumer: print(message) for message in consumer.get_messages(count=5, block=True, timeout=4): print(message) Low level --------- .. code:: python from kafka import KafkaClient, create_message from kafka.protocol import KafkaProtocol from kafka.common import ProduceRequest kafka = KafkaClient("localhost:9092") req = ProduceRequest(topic="my-topic", partition=1, messages=[create_message("some message")]) resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) kafka.close() resps[0].topic # "my-topic" resps[0].partition # 1 resps[0].error # 0 (hopefully) resps[0].offset # offset of the first message sent in this request