summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 17:14:49 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 17:14:56 -0800
commitd4e85ecd1d8acac1a0f74d164b67faefd99987e4 (patch)
tree04d754bbd47230cd0c979926a0730750005d5e2d
parent2a2e77aa1e5c31b3e815d573051bb2019daaa306 (diff)
downloadkafka-python-d4e85ecd1d8acac1a0f74d164b67faefd99987e4.tar.gz
Update docs for release w/ new async classes
-rw-r--r--docs/apidoc/BrokerConnection.rst5
-rw-r--r--docs/apidoc/KafkaClient.rst5
-rw-r--r--docs/apidoc/KafkaConsumer.rst5
-rw-r--r--docs/apidoc/KafkaProducer.rst4
-rw-r--r--docs/apidoc/SimpleProducer.rst14
-rw-r--r--docs/apidoc/modules.rst11
-rw-r--r--docs/compatibility.rst14
-rw-r--r--docs/conf.py2
-rw-r--r--docs/index.rst106
-rw-r--r--docs/install.rst12
-rw-r--r--docs/license.rst10
-rw-r--r--docs/support.rst11
-rw-r--r--docs/tests.rst76
-rw-r--r--docs/usage.rst246
14 files changed, 283 insertions, 238 deletions
diff --git a/docs/apidoc/BrokerConnection.rst b/docs/apidoc/BrokerConnection.rst
new file mode 100644
index 0000000..c56cf42
--- /dev/null
+++ b/docs/apidoc/BrokerConnection.rst
@@ -0,0 +1,5 @@
+BrokerConnection
+================
+
+.. autoclass:: kafka.BrokerConnection
+ :members:
diff --git a/docs/apidoc/KafkaClient.rst b/docs/apidoc/KafkaClient.rst
new file mode 100644
index 0000000..5c9d736
--- /dev/null
+++ b/docs/apidoc/KafkaClient.rst
@@ -0,0 +1,5 @@
+KafkaClient
+===========
+
+.. autoclass:: kafka.KafkaClient
+ :members:
diff --git a/docs/apidoc/KafkaConsumer.rst b/docs/apidoc/KafkaConsumer.rst
new file mode 100644
index 0000000..39062c6
--- /dev/null
+++ b/docs/apidoc/KafkaConsumer.rst
@@ -0,0 +1,5 @@
+KafkaConsumer
+=============
+
+.. autoclass:: kafka.KafkaConsumer
+ :members:
diff --git a/docs/apidoc/KafkaProducer.rst b/docs/apidoc/KafkaProducer.rst
new file mode 100644
index 0000000..c33b2f9
--- /dev/null
+++ b/docs/apidoc/KafkaProducer.rst
@@ -0,0 +1,4 @@
+KafkaProducer
+=============
+
+<unreleased> See :class:`kafka.producer.SimpleProducer`
diff --git a/docs/apidoc/SimpleProducer.rst b/docs/apidoc/SimpleProducer.rst
new file mode 100644
index 0000000..a509858
--- /dev/null
+++ b/docs/apidoc/SimpleProducer.rst
@@ -0,0 +1,14 @@
+SimpleProducer
+==============
+
+.. autoclass:: kafka.producer.SimpleProducer
+ :members:
+ :show-inheritance:
+
+.. autoclass:: kafka.producer.KeyedProducer
+ :members:
+ :show-inheritance:
+
+.. automodule:: kafka.producer.base
+ :members:
+ :show-inheritance:
diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst
index db3e580..f6eb798 100644
--- a/docs/apidoc/modules.rst
+++ b/docs/apidoc/modules.rst
@@ -1,7 +1,10 @@
-kafka
-=====
+kafka-python API
+****************
.. toctree::
- :maxdepth: 4
- kafka
+ KafkaConsumer
+ KafkaProducer
+ KafkaClient
+ BrokerConnection
+ SimpleProducer
diff --git a/docs/compatibility.rst b/docs/compatibility.rst
new file mode 100644
index 0000000..ccc4b96
--- /dev/null
+++ b/docs/compatibility.rst
@@ -0,0 +1,14 @@
+Compatibility
+-------------
+
+.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg
+ :target: https://kafka-python.readthedocs.org/compatibility.html
+.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
+ :target: https://pypi.python.org/pypi/kafka-python
+
+kafka-python is compatible with (and tested against) broker versions 0.9.0.0
+through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release.
+
+kafka-python is tested on python 2.6, 2.7, 3.3, 3.4, 3.5, and pypy.
+
+Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python
diff --git a/docs/conf.py b/docs/conf.py
index 805c729..66f9663 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -49,7 +49,7 @@ master_doc = 'index'
# General information about the project.
project = u'kafka-python'
-copyright = u'2015 - David Arthur, Dana Powers, and Contributors'
+copyright = u'2016 -- Dana Powes, David Arthur, and Contributors'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
diff --git a/docs/index.rst b/docs/index.rst
index fa77a8e..f65d4db 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -1,66 +1,86 @@
kafka-python
-============
+############
-This module provides low-level protocol support for Apache Kafka as well as
-high-level consumer and producer classes. Request batching is supported by the
-protocol as well as broker-aware request routing. Gzip and Snappy compression
-is also supported for message sets.
+.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg
+ :target: https://kafka-python.readthedocs.org/compatibility.html
+.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
+ :target: https://pypi.python.org/pypi/kafka-python
+.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/dpkp/kafka-python?branch=master
+.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master
+ :target: https://travis-ci.org/dpkp/kafka-python
+.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
+ :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
-Coordinated Consumer Group support is under development - see Issue #38.
+>>> pip install kafka-python
-On Freenode IRC at #kafka-python, as well as #apache-kafka
+kafka-python is a client for the Apache Kafka distributed stream processing
+system. It is designed to function much like the official java client, with a
+sprinkling of pythonic interfaces (e.g., iterators).
-For general discussion of kafka-client design and implementation (not python specific),
-see https://groups.google.com/forum/m/#!forum/kafka-clients
-For information about Apache Kafka generally, see https://kafka.apache.org/
+KafkaConsumer
+*************
-Status
-------
+>>> from kafka import KafkaConsumer
+>>> consumer = KafkaConsumer('my_favorite_topic')
+>>> for msg in consumer:
+... print (msg)
-The current stable version of this package is `0.9.5 <https://github.com/dpkp/kafka-python/releases/tag/v0.9.5>`_ and is compatible with:
+:class:`~kafka.consumer.KafkaConsumer` is a full-featured,
+high-level message consumer class that is similar in design and function to the
+new 0.9 java consumer. Most configuration parameters defined by the official
+java client are supported as optional kwargs, with generally similar behavior.
+Gzip and Snappy compressed messages are supported transparently.
-Kafka broker versions
+In addition to the standard
+:meth:`~kafka.consumer.KafkaConsumer.poll` interface (which returns
+micro-batches of messages, grouped by topic-partition), kafka-python supports
+single-message iteration, yielding :class:`~kafka.consumer.ConsumerRecord`
+namedtuples, which include the topic, partition, offset, key, and value of each
+message.
-* 0.9.0.0
-* 0.8.2.2
-* 0.8.2.1
-* 0.8.1.1
-* 0.8.1
-* 0.8.0
+By default, :class:`~kafka.consumer.KafkaConsumer` will attempt to auto-commit
+message offsets every 5 seconds. When used with 0.9 kafka brokers,
+:class:`~kafka.consumer.KafkaConsumer` will dynamically assign partitions using
+the kafka GroupCoordinator APIs and a
+:class:`~kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor`
+partitioning strategy, enabling relatively straightforward parallel consumption
+patterns. See :doc:`usage` for examples.
-Python versions
-* 3.5 (tested on 3.5.0)
-* 3.4 (tested on 3.4.2)
-* 3.3 (tested on 3.3.5)
-* 2.7 (tested on 2.7.9)
-* 2.6 (tested on 2.6.9)
-* pypy (tested on pypy 2.5.0 / python 2.7.8)
+KafkaProducer
+*************
-License
--------
+TBD
-Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_.
-Copyright 2015, David Arthur, Dana Powers, and Contributors
-(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_).
+Protocol
+********
+A secondary goal of kafka-python is to provide an easy-to-use protocol layer
+for interacting with kafka brokers via the python repl. This is useful for
+testing, probing, and general experimentation. The protocol support is
+leveraged to enable a :meth:`~kafka.KafkaClient.check_version()`
+method that probes a kafka broker and
+attempts to identify which version it is running (0.8.0 to 0.9).
+
+
+Low-level
+*********
+
+Legacy support is maintained for low-level consumer and producer classes,
+SimpleConsumer and SimpleProducer.
-Contents
---------
.. toctree::
+ :hidden:
:maxdepth: 2
- usage
+ Usage Overview <usage>
+ API </apidoc/modules>
install
tests
- API reference </apidoc/modules>
-
-Indices and tables
-==================
-
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
+ compatibility
+ support
+ license
diff --git a/docs/install.rst b/docs/install.rst
index 2bc6911..bf49c3f 100644
--- a/docs/install.rst
+++ b/docs/install.rst
@@ -1,10 +1,10 @@
Install
-=======
+#######
Install with your favorite package manager
Latest Release
---------------
+**************
Pip:
.. code:: bash
@@ -15,7 +15,7 @@ Releases are also listed at https://github.com/dpkp/kafka-python/releases
Bleeding-Edge
--------------
+*************
.. code:: bash
@@ -39,10 +39,10 @@ Using `setup.py` directly:
Optional Snappy install
------------------------
+***********************
Install Development Libraries
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+=============================
Download and build Snappy from http://code.google.com/p/snappy/downloads/list
@@ -70,7 +70,7 @@ From Source:
sudo make install
Install Python Module
-^^^^^^^^^^^^^^^^^^^^^
+=====================
Install the `python-snappy` module
diff --git a/docs/license.rst b/docs/license.rst
new file mode 100644
index 0000000..13df48c
--- /dev/null
+++ b/docs/license.rst
@@ -0,0 +1,10 @@
+License
+-------
+
+.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
+ :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
+
+Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_.
+
+Copyright 2016, David Arthur, Dana Powers, and Contributors
+(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_).
diff --git a/docs/support.rst b/docs/support.rst
new file mode 100644
index 0000000..63d4a86
--- /dev/null
+++ b/docs/support.rst
@@ -0,0 +1,11 @@
+Support
+-------
+
+For support, see github issues at https://github.com/dpkp/kafka-python
+
+Limited IRC chat at #kafka-python on freenode (general chat is #apache-kafka).
+
+For information about Apache Kafka generally, see https://kafka.apache.org/
+
+For general discussion of kafka-client design and implementation (not python
+specific), see https://groups.google.com/forum/m/#!forum/kafka-clients
diff --git a/docs/tests.rst b/docs/tests.rst
index df9a3ef..e5dd269 100644
--- a/docs/tests.rst
+++ b/docs/tests.rst
@@ -1,59 +1,83 @@
Tests
=====
-Run the unit tests
-------------------
+.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/dpkp/kafka-python?branch=master
+.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master
+ :target: https://travis-ci.org/dpkp/kafka-python
-.. code:: bash
+Test environments are managed via tox. The test suite is run via pytest.
+Individual tests are written using unittest, pytest, and in some cases,
+doctest.
+
+Linting is run via pylint, but is generally skipped on python2.6 and pypy
+due to pylint compatibility / performance issues.
+
+For test coverage details, see https://coveralls.io/github/dpkp/kafka-python
- tox
+The test suite includes unit tests that mock network interfaces, as well as
+integration tests that setup and teardown kafka broker (and zookeeper)
+fixtures for client / consumer / producer testing.
+
+
+Unit tests
+------------------
+To run the tests locally, install tox -- `pip install tox`
+See http://tox.readthedocs.org/en/latest/install.html
-Run a subset of unit tests
---------------------------
+Then simply run tox, optionally setting the python environment.
+If unset, tox will loop through all environments.
.. code:: bash
+ tox -e py27
+ tox -e py35
+
# run protocol tests only
tox -- -v test.test_protocol
- # test with pypy only
- tox -e pypy
+ # re-run the last failing test, dropping into pdb
+ tox -e py27 -- --lf --pdb
+
+ # see available (pytest) options
+ tox -e py27 -- --help
- # Run only 1 test, and use python 2.7
- tox -e py27 -- -v --with-id --collect-only
- # pick a test number from the list like #102
- tox -e py27 -- -v --with-id 102
+Integration tests
+-----------------
+.. code:: bash
-Run the integration tests
--------------------------
+ KAFKA_VERSION=0.9.0.0 tox -e py27
+ KAFKA_VERSION=0.8.2.2 tox -e py35
-The integration tests will actually start up real local Zookeeper
-instance and Kafka brokers, and send messages in using the client.
-First, get the kafka binaries for integration testing:
+Integration tests start Kafka and Zookeeper fixtures. This requires downloading
+kafka server binaries:
.. code:: bash
./build_integration.sh
-By default, the build_integration.sh script will download binary
-distributions for all supported kafka versions.
-To test against the latest source build, set KAFKA_VERSION=trunk
-and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended)
+By default, this will install 0.8.1.1, 0.8.2.2, and 0.9.0.0 brokers into the
+servers/ directory. To install a specific version, set `KAFKA_VERSION=1.2.3`:
.. code:: bash
- SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh
+ KAFKA_VERSION=0.8.0 ./build_integration.sh
Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION`
env variable to the server build you want to use for testing:
.. code:: bash
- KAFKA_VERSION=0.8.0 tox
- KAFKA_VERSION=0.8.1 tox
- KAFKA_VERSION=0.8.1.1 tox
- KAFKA_VERSION=trunk tox
+ KAFKA_VERSION=0.9.0.0 tox -e py27
+
+To test against the kafka source tree, set KAFKA_VERSION=trunk
+[optionally set SCALA_VERSION (defaults to 2.10)]
+
+.. code:: bash
+
+ SCALA_VERSION=2.11 KAFKA_VERSION=trunk ./build_integration.sh
+ KAFKA_VERSION=trunk tox -e py35
diff --git a/docs/usage.rst b/docs/usage.rst
index 6417cd8..e74e5af 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -1,68 +1,126 @@
Usage
-=====
+*****
-SimpleProducer
---------------
+
+KafkaConsumer
+=============
.. code:: python
- from kafka import SimpleProducer, KafkaClient
+ from kafka import KafkaConsumer
- # To send messages synchronously
- kafka = KafkaClient('localhost:9092')
- producer = SimpleProducer(kafka)
+ # To consume latest messages and auto-commit offsets
+ consumer = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers=['localhost:9092'])
+ for message in consumer:
+ # message value and key are raw bytes -- decode if necessary!
+ # e.g., for unicode: `message.value.decode('utf-8')`
+ print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
+ message.offset, message.key,
+ message.value))
- # Note that the application is responsible for encoding messages to type bytes
- producer.send_messages(b'my-topic', b'some message')
- producer.send_messages(b'my-topic', b'this method', b'is variadic')
+ # consume earliest available messages, dont commit offsets
+ KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
- # Send unicode message
- producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8'))
+ # consume json messages
+ KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+ # consume msgpack
+ KafkaConsumer(value_deserializer=msgpack.unpackb)
+
+ # StopIteration if no message after 1sec
+ KafkaConsumer(consumer_timeout_ms=1000)
+
+ # Subscribe to a regex topic pattern
+ consumer = KafkaConsumer()
+ consumer.subscribe(pattern='^awesome.*')
+
+ # Use multiple consumers in parallel w/ 0.9 kafka brokers
+ # typically you would run each on a different server / process / CPU
+ consumer1 = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers='my.server.com')
+ consumer2 = KafkaConsumer('my-topic',
+ group_id='my-group',
+ bootstrap_servers='my.server.com')
+
+
+There are many configuration options for the consumer class. See
+:class:`~kafka.KafkaConsumer` API documentation for more details.
+
+
+SimpleProducer
+==============
Asynchronous Mode
-----------------
.. code:: python
+ from kafka import SimpleProducer, SimpleClient
+
# To send messages asynchronously
- producer = SimpleProducer(kafka, async=True)
- producer.send_messages(b'my-topic', b'async message')
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=True)
+ producer.send_messages('my-topic', b'async message')
+
+ # To send messages in batch. You can use any of the available
+ # producers for doing this. The following producer will collect
+ # messages in batch and send them to Kafka after 20 messages are
+ # collected or every 60 seconds
+ # Notes:
+ # * If the producer dies before the messages are sent, there will be losses
+ # * Call producer.stop() to send the messages and cleanup
+ producer = SimpleProducer(client,
+ async=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+Synchronous Mode
+----------------
+
+.. code:: python
+
+ from kafka import SimpleProducer, SimpleClient
+
+ # To send messages synchronously
+ client = SimpleClient('localhost:9092')
+ producer = SimpleProducer(client, async=False)
+
+ # Note that the application is responsible for encoding messages to type bytes
+ producer.send_messages('my-topic', b'some message')
+ producer.send_messages('my-topic', b'this method', b'is variadic')
+
+ # Send unicode message
+ producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
- producer = SimpleProducer(kafka, async=False,
+ producer = SimpleProducer(client,
+ async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000,
sync_fail_on_error=False)
- responses = producer.send_messages(b'my-topic', b'another message')
+ responses = producer.send_messages('my-topic', b'another message')
for r in responses:
logging.info(r.offset)
- # To send messages in batch. You can use any of the available
- # producers for doing this. The following producer will collect
- # messages in batch and send them to Kafka after 20 messages are
- # collected or every 60 seconds
- # Notes:
- # * If the producer dies before the messages are sent, there will be losses
- # * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(kafka, async=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-Keyed messages
---------------
+KeyedProducer
+=============
.. code:: python
from kafka import (
- KafkaClient, KeyedProducer,
+ SimpleClient, KeyedProducer,
Murmur2Partitioner, RoundRobinPartitioner)
- kafka = KafkaClient('localhost:9092')
+ kafka = SimpleClient('localhost:9092')
# HashedPartitioner is default (currently uses python hash())
producer = KeyedProducer(kafka)
@@ -74,131 +132,3 @@ Keyed messages
# Or just produce round-robin (or just use SimpleProducer)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
-
-
-
-KafkaConsumer
--------------
-
-.. code:: python
-
- from kafka import KafkaConsumer
-
- # To consume messages
- consumer = KafkaConsumer('my-topic',
- group_id='my_group',
- bootstrap_servers=['localhost:9092'])
- for message in consumer:
- # message value is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.value.decode('utf-8')`
- print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
- message.offset, message.key,
- message.value))
-
-
-messages (m) are namedtuples with attributes:
-
- * `m.topic`: topic name (str)
- * `m.partition`: partition number (int)
- * `m.offset`: message offset on topic-partition log (int)
- * `m.key`: key (bytes - can be None)
- * `m.value`: message (output of deserializer_class - default is raw bytes)
-
-
-.. code:: python
-
- from kafka import KafkaConsumer
-
- # more advanced consumer -- multiple topics w/ auto commit offset
- # management
- consumer = KafkaConsumer('topic1', 'topic2',
- bootstrap_servers=['localhost:9092'],
- group_id='my_consumer_group',
- auto_commit_enable=True,
- auto_commit_interval_ms=30 * 1000,
- auto_offset_reset='smallest')
-
- # Infinite iteration
- for m in consumer:
- do_some_work(m)
-
- # Mark this message as fully consumed
- # so it can be included in the next commit
- #
- # **messages that are not marked w/ task_done currently do not commit!
- consumer.task_done(m)
-
- # If auto_commit_enable is False, remember to commit() periodically
- consumer.commit()
-
- # Batch process interface
- while True:
- for m in kafka.fetch_messages():
- process_message(m)
- consumer.task_done(m)
-
-
- Configuration settings can be passed to constructor,
- otherwise defaults will be used:
-
-.. code:: python
-
- client_id='kafka.consumer.kafka',
- group_id=None,
- fetch_message_max_bytes=1024*1024,
- fetch_min_bytes=1,
- fetch_wait_max_ms=100,
- refresh_leader_backoff_ms=200,
- bootstrap_servers=[],
- socket_timeout_ms=30*1000,
- auto_offset_reset='largest',
- deserializer_class=lambda msg: msg,
- auto_commit_enable=False,
- auto_commit_interval_ms=60 * 1000,
- consumer_timeout_ms=-1
-
- Configuration parameters are described in more detail at
- http://kafka.apache.org/documentation.html#highlevelconsumerapi
-
-Multiprocess consumer
----------------------
-
-.. code:: python
-
- from kafka import KafkaClient, MultiProcessConsumer
-
- kafka = KafkaClient('localhost:9092')
-
- # This will split the number of partitions among two processes
- consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2)
-
- # This will spawn processes such that each handles 2 partitions max
- consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic',
- partitions_per_proc=2)
-
- for message in consumer:
- print(message)
-
- for message in consumer.get_messages(count=5, block=True, timeout=4):
- print(message)
-
-Low level
----------
-
-.. code:: python
-
- from kafka import KafkaClient, create_message
- from kafka.protocol import KafkaProtocol
- from kafka.common import ProduceRequest
-
- kafka = KafkaClient('localhost:9092')
-
- req = ProduceRequest(topic=b'my-topic', partition=1,
- messages=[create_message(b'some message')])
- resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
- kafka.close()
-
- resps[0].topic # b'my-topic'
- resps[0].partition # 1
- resps[0].error # 0 (hopefully)
- resps[0].offset # offset of the first message sent in this request