summaryrefslogtreecommitdiff
path: root/docs/usage.rst
blob: 141cf93541f965e822b59852f830820aabea0374 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
Usage
=====

High level
----------

.. code:: python

    from kafka import SimpleProducer, KafkaClient, KafkaConsumer

    # To send messages synchronously
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)

    # Note that the application is responsible for encoding messages to type str
    producer.send_messages("my-topic", "some message")
    producer.send_messages("my-topic", "this method", "is variadic")

    # Send unicode message
    producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8'))

    # To send messages asynchronously
    # WARNING: current implementation does not guarantee message delivery on failure!
    # messages can get dropped! Use at your own risk! Or help us improve with a PR!
    producer = SimpleProducer(kafka, async=True)
    producer.send_messages("my-topic", "async message")

    # To wait for acknowledgements
    # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
    #                         a local log before sending response
    # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
    #                            by all in sync replicas before sending a response
    producer = SimpleProducer(kafka, async=False,
                              req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
                              ack_timeout=2000)

    response = producer.send_messages("my-topic", "another message")

    if response:
        print(response[0].error)
        print(response[0].offset)

    # To send messages in batch. You can use any of the available
    # producers for doing this. The following producer will collect
    # messages in batch and send them to Kafka after 20 messages are
    # collected or every 60 seconds
    # Notes:
    # * If the producer dies before the messages are sent, there will be losses
    # * Call producer.stop() to send the messages and cleanup
    producer = SimpleProducer(kafka, batch_send=True,
                              batch_send_every_n=20,
                              batch_send_every_t=60)

    # To consume messages
    consumer = KafkaConsumer("my-topic", group_id="my_group",
                             metadata_broker_list=["localhost:9092"])
    for message in consumer:
        # message is raw byte string -- decode if necessary!
        # e.g., for unicode: `message.decode('utf-8')`
        print(message)

    kafka.close()


Keyed messages
--------------

.. code:: python

    from kafka import (KafkaClient, KeyedProducer, HashedPartitioner,
                       RoundRobinPartitioner)

    kafka = KafkaClient("localhost:9092")

    # HashedPartitioner is default
    producer = KeyedProducer(kafka)
    producer.send("my-topic", "key1", "some message")
    producer.send("my-topic", "key2", "this methode")

    producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)


Multiprocess consumer
---------------------

.. code:: python

    from kafka import KafkaClient, MultiProcessConsumer

    kafka = KafkaClient("localhost:9092")

    # This will split the number of partitions among two processes
    consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)

    # This will spawn processes such that each handles 2 partitions max
    consumer = MultiProcessConsumer(kafka, "my-group", "my-topic",
                                    partitions_per_proc=2)

    for message in consumer:
        print(message)

    for message in consumer.get_messages(count=5, block=True, timeout=4):
        print(message)

Low level
---------

.. code:: python

    from kafka import KafkaClient, create_message
    from kafka.protocol import KafkaProtocol
    from kafka.common import ProduceRequest

    kafka = KafkaClient("localhost:9092")

    req = ProduceRequest(topic="my-topic", partition=1,
        messages=[create_message("some message")])
    resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
    kafka.close()

    resps[0].topic      # "my-topic"
    resps[0].partition  # 1
    resps[0].error      # 0 (hopefully)
    resps[0].offset     # offset of the first message sent in this request