diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:14:49 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 17:14:56 -0800 |
commit | d4e85ecd1d8acac1a0f74d164b67faefd99987e4 (patch) | |
tree | 04d754bbd47230cd0c979926a0730750005d5e2d | |
parent | 2a2e77aa1e5c31b3e815d573051bb2019daaa306 (diff) | |
download | kafka-python-d4e85ecd1d8acac1a0f74d164b67faefd99987e4.tar.gz |
Update docs for release w/ new async classes
-rw-r--r-- | docs/apidoc/BrokerConnection.rst | 5 | ||||
-rw-r--r-- | docs/apidoc/KafkaClient.rst | 5 | ||||
-rw-r--r-- | docs/apidoc/KafkaConsumer.rst | 5 | ||||
-rw-r--r-- | docs/apidoc/KafkaProducer.rst | 4 | ||||
-rw-r--r-- | docs/apidoc/SimpleProducer.rst | 14 | ||||
-rw-r--r-- | docs/apidoc/modules.rst | 11 | ||||
-rw-r--r-- | docs/compatibility.rst | 14 | ||||
-rw-r--r-- | docs/conf.py | 2 | ||||
-rw-r--r-- | docs/index.rst | 106 | ||||
-rw-r--r-- | docs/install.rst | 12 | ||||
-rw-r--r-- | docs/license.rst | 10 | ||||
-rw-r--r-- | docs/support.rst | 11 | ||||
-rw-r--r-- | docs/tests.rst | 76 | ||||
-rw-r--r-- | docs/usage.rst | 246 |
14 files changed, 283 insertions, 238 deletions
diff --git a/docs/apidoc/BrokerConnection.rst b/docs/apidoc/BrokerConnection.rst new file mode 100644 index 0000000..c56cf42 --- /dev/null +++ b/docs/apidoc/BrokerConnection.rst @@ -0,0 +1,5 @@ +BrokerConnection +================ + +.. autoclass:: kafka.BrokerConnection + :members: diff --git a/docs/apidoc/KafkaClient.rst b/docs/apidoc/KafkaClient.rst new file mode 100644 index 0000000..5c9d736 --- /dev/null +++ b/docs/apidoc/KafkaClient.rst @@ -0,0 +1,5 @@ +KafkaClient +=========== + +.. autoclass:: kafka.KafkaClient + :members: diff --git a/docs/apidoc/KafkaConsumer.rst b/docs/apidoc/KafkaConsumer.rst new file mode 100644 index 0000000..39062c6 --- /dev/null +++ b/docs/apidoc/KafkaConsumer.rst @@ -0,0 +1,5 @@ +KafkaConsumer +============= + +.. autoclass:: kafka.KafkaConsumer + :members: diff --git a/docs/apidoc/KafkaProducer.rst b/docs/apidoc/KafkaProducer.rst new file mode 100644 index 0000000..c33b2f9 --- /dev/null +++ b/docs/apidoc/KafkaProducer.rst @@ -0,0 +1,4 @@ +KafkaProducer +============= + +<unreleased> See :class:`kafka.producer.SimpleProducer` diff --git a/docs/apidoc/SimpleProducer.rst b/docs/apidoc/SimpleProducer.rst new file mode 100644 index 0000000..a509858 --- /dev/null +++ b/docs/apidoc/SimpleProducer.rst @@ -0,0 +1,14 @@ +SimpleProducer +============== + +.. autoclass:: kafka.producer.SimpleProducer + :members: + :show-inheritance: + +.. autoclass:: kafka.producer.KeyedProducer + :members: + :show-inheritance: + +.. automodule:: kafka.producer.base + :members: + :show-inheritance: diff --git a/docs/apidoc/modules.rst b/docs/apidoc/modules.rst index db3e580..f6eb798 100644 --- a/docs/apidoc/modules.rst +++ b/docs/apidoc/modules.rst @@ -1,7 +1,10 @@ -kafka -===== +kafka-python API +**************** .. toctree:: - :maxdepth: 4 - kafka + KafkaConsumer + KafkaProducer + KafkaClient + BrokerConnection + SimpleProducer diff --git a/docs/compatibility.rst b/docs/compatibility.rst new file mode 100644 index 0000000..ccc4b96 --- /dev/null +++ b/docs/compatibility.rst @@ -0,0 +1,14 @@ +Compatibility +------------- + +.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg + :target: https://kafka-python.readthedocs.org/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg + :target: https://pypi.python.org/pypi/kafka-python + +kafka-python is compatible with (and tested against) broker versions 0.9.0.0 +through 0.8.0 . kafka-python is not compatible with the 0.8.2-beta release. + +kafka-python is tested on python 2.6, 2.7, 3.3, 3.4, 3.5, and pypy. + +Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python diff --git a/docs/conf.py b/docs/conf.py index 805c729..66f9663 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -49,7 +49,7 @@ master_doc = 'index' # General information about the project. project = u'kafka-python' -copyright = u'2015 - David Arthur, Dana Powers, and Contributors' +copyright = u'2016 -- Dana Powes, David Arthur, and Contributors' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the diff --git a/docs/index.rst b/docs/index.rst index fa77a8e..f65d4db 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,66 +1,86 @@ kafka-python -============ +############ -This module provides low-level protocol support for Apache Kafka as well as -high-level consumer and producer classes. Request batching is supported by the -protocol as well as broker-aware request routing. Gzip and Snappy compression -is also supported for message sets. +.. image:: https://img.shields.io/badge/kafka-0.9%2C%200.8.2%2C%200.8.1%2C%200.8-brightgreen.svg + :target: https://kafka-python.readthedocs.org/compatibility.html +.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg + :target: https://pypi.python.org/pypi/kafka-python +.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/dpkp/kafka-python?branch=master +.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master + :target: https://travis-ci.org/dpkp/kafka-python +.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg + :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE -Coordinated Consumer Group support is under development - see Issue #38. +>>> pip install kafka-python -On Freenode IRC at #kafka-python, as well as #apache-kafka +kafka-python is a client for the Apache Kafka distributed stream processing +system. It is designed to function much like the official java client, with a +sprinkling of pythonic interfaces (e.g., iterators). -For general discussion of kafka-client design and implementation (not python specific), -see https://groups.google.com/forum/m/#!forum/kafka-clients -For information about Apache Kafka generally, see https://kafka.apache.org/ +KafkaConsumer +************* -Status ------- +>>> from kafka import KafkaConsumer +>>> consumer = KafkaConsumer('my_favorite_topic') +>>> for msg in consumer: +... print (msg) -The current stable version of this package is `0.9.5 <https://github.com/dpkp/kafka-python/releases/tag/v0.9.5>`_ and is compatible with: +:class:`~kafka.consumer.KafkaConsumer` is a full-featured, +high-level message consumer class that is similar in design and function to the +new 0.9 java consumer. Most configuration parameters defined by the official +java client are supported as optional kwargs, with generally similar behavior. +Gzip and Snappy compressed messages are supported transparently. -Kafka broker versions +In addition to the standard +:meth:`~kafka.consumer.KafkaConsumer.poll` interface (which returns +micro-batches of messages, grouped by topic-partition), kafka-python supports +single-message iteration, yielding :class:`~kafka.consumer.ConsumerRecord` +namedtuples, which include the topic, partition, offset, key, and value of each +message. -* 0.9.0.0 -* 0.8.2.2 -* 0.8.2.1 -* 0.8.1.1 -* 0.8.1 -* 0.8.0 +By default, :class:`~kafka.consumer.KafkaConsumer` will attempt to auto-commit +message offsets every 5 seconds. When used with 0.9 kafka brokers, +:class:`~kafka.consumer.KafkaConsumer` will dynamically assign partitions using +the kafka GroupCoordinator APIs and a +:class:`~kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor` +partitioning strategy, enabling relatively straightforward parallel consumption +patterns. See :doc:`usage` for examples. -Python versions -* 3.5 (tested on 3.5.0) -* 3.4 (tested on 3.4.2) -* 3.3 (tested on 3.3.5) -* 2.7 (tested on 2.7.9) -* 2.6 (tested on 2.6.9) -* pypy (tested on pypy 2.5.0 / python 2.7.8) +KafkaProducer +************* -License -------- +TBD -Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_. -Copyright 2015, David Arthur, Dana Powers, and Contributors -(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_). +Protocol +******** +A secondary goal of kafka-python is to provide an easy-to-use protocol layer +for interacting with kafka brokers via the python repl. This is useful for +testing, probing, and general experimentation. The protocol support is +leveraged to enable a :meth:`~kafka.KafkaClient.check_version()` +method that probes a kafka broker and +attempts to identify which version it is running (0.8.0 to 0.9). + + +Low-level +********* + +Legacy support is maintained for low-level consumer and producer classes, +SimpleConsumer and SimpleProducer. -Contents --------- .. toctree:: + :hidden: :maxdepth: 2 - usage + Usage Overview <usage> + API </apidoc/modules> install tests - API reference </apidoc/modules> - -Indices and tables -================== - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` + compatibility + support + license diff --git a/docs/install.rst b/docs/install.rst index 2bc6911..bf49c3f 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -1,10 +1,10 @@ Install -======= +####### Install with your favorite package manager Latest Release --------------- +************** Pip: .. code:: bash @@ -15,7 +15,7 @@ Releases are also listed at https://github.com/dpkp/kafka-python/releases Bleeding-Edge -------------- +************* .. code:: bash @@ -39,10 +39,10 @@ Using `setup.py` directly: Optional Snappy install ------------------------ +*********************** Install Development Libraries -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +============================= Download and build Snappy from http://code.google.com/p/snappy/downloads/list @@ -70,7 +70,7 @@ From Source: sudo make install Install Python Module -^^^^^^^^^^^^^^^^^^^^^ +===================== Install the `python-snappy` module diff --git a/docs/license.rst b/docs/license.rst new file mode 100644 index 0000000..13df48c --- /dev/null +++ b/docs/license.rst @@ -0,0 +1,10 @@ +License +------- + +.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg + :target: https://github.com/dpkp/kafka-python/blob/master/LICENSE + +Apache License, v2.0. See `LICENSE <https://github.com/dpkp/kafka-python/blob/master/LICENSE>`_. + +Copyright 2016, David Arthur, Dana Powers, and Contributors +(See `AUTHORS <https://github.com/dpkp/kafka-python/blob/master/AUTHORS.md>`_). diff --git a/docs/support.rst b/docs/support.rst new file mode 100644 index 0000000..63d4a86 --- /dev/null +++ b/docs/support.rst @@ -0,0 +1,11 @@ +Support +------- + +For support, see github issues at https://github.com/dpkp/kafka-python + +Limited IRC chat at #kafka-python on freenode (general chat is #apache-kafka). + +For information about Apache Kafka generally, see https://kafka.apache.org/ + +For general discussion of kafka-client design and implementation (not python +specific), see https://groups.google.com/forum/m/#!forum/kafka-clients diff --git a/docs/tests.rst b/docs/tests.rst index df9a3ef..e5dd269 100644 --- a/docs/tests.rst +++ b/docs/tests.rst @@ -1,59 +1,83 @@ Tests ===== -Run the unit tests ------------------- +.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github + :target: https://coveralls.io/github/dpkp/kafka-python?branch=master +.. image:: https://travis-ci.org/dpkp/kafka-python.svg?branch=master + :target: https://travis-ci.org/dpkp/kafka-python -.. code:: bash +Test environments are managed via tox. The test suite is run via pytest. +Individual tests are written using unittest, pytest, and in some cases, +doctest. + +Linting is run via pylint, but is generally skipped on python2.6 and pypy +due to pylint compatibility / performance issues. + +For test coverage details, see https://coveralls.io/github/dpkp/kafka-python - tox +The test suite includes unit tests that mock network interfaces, as well as +integration tests that setup and teardown kafka broker (and zookeeper) +fixtures for client / consumer / producer testing. + + +Unit tests +------------------ +To run the tests locally, install tox -- `pip install tox` +See http://tox.readthedocs.org/en/latest/install.html -Run a subset of unit tests --------------------------- +Then simply run tox, optionally setting the python environment. +If unset, tox will loop through all environments. .. code:: bash + tox -e py27 + tox -e py35 + # run protocol tests only tox -- -v test.test_protocol - # test with pypy only - tox -e pypy + # re-run the last failing test, dropping into pdb + tox -e py27 -- --lf --pdb + + # see available (pytest) options + tox -e py27 -- --help - # Run only 1 test, and use python 2.7 - tox -e py27 -- -v --with-id --collect-only - # pick a test number from the list like #102 - tox -e py27 -- -v --with-id 102 +Integration tests +----------------- +.. code:: bash -Run the integration tests -------------------------- + KAFKA_VERSION=0.9.0.0 tox -e py27 + KAFKA_VERSION=0.8.2.2 tox -e py35 -The integration tests will actually start up real local Zookeeper -instance and Kafka brokers, and send messages in using the client. -First, get the kafka binaries for integration testing: +Integration tests start Kafka and Zookeeper fixtures. This requires downloading +kafka server binaries: .. code:: bash ./build_integration.sh -By default, the build_integration.sh script will download binary -distributions for all supported kafka versions. -To test against the latest source build, set KAFKA_VERSION=trunk -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) +By default, this will install 0.8.1.1, 0.8.2.2, and 0.9.0.0 brokers into the +servers/ directory. To install a specific version, set `KAFKA_VERSION=1.2.3`: .. code:: bash - SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh + KAFKA_VERSION=0.8.0 ./build_integration.sh Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` env variable to the server build you want to use for testing: .. code:: bash - KAFKA_VERSION=0.8.0 tox - KAFKA_VERSION=0.8.1 tox - KAFKA_VERSION=0.8.1.1 tox - KAFKA_VERSION=trunk tox + KAFKA_VERSION=0.9.0.0 tox -e py27 + +To test against the kafka source tree, set KAFKA_VERSION=trunk +[optionally set SCALA_VERSION (defaults to 2.10)] + +.. code:: bash + + SCALA_VERSION=2.11 KAFKA_VERSION=trunk ./build_integration.sh + KAFKA_VERSION=trunk tox -e py35 diff --git a/docs/usage.rst b/docs/usage.rst index 6417cd8..e74e5af 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -1,68 +1,126 @@ Usage -===== +***** -SimpleProducer --------------- + +KafkaConsumer +============= .. code:: python - from kafka import SimpleProducer, KafkaClient + from kafka import KafkaConsumer - # To send messages synchronously - kafka = KafkaClient('localhost:9092') - producer = SimpleProducer(kafka) + # To consume latest messages and auto-commit offsets + consumer = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers=['localhost:9092']) + for message in consumer: + # message value and key are raw bytes -- decode if necessary! + # e.g., for unicode: `message.value.decode('utf-8')` + print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, + message.offset, message.key, + message.value)) - # Note that the application is responsible for encoding messages to type bytes - producer.send_messages(b'my-topic', b'some message') - producer.send_messages(b'my-topic', b'this method', b'is variadic') + # consume earliest available messages, dont commit offsets + KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) - # Send unicode message - producer.send_messages(b'my-topic', u'你怎么样?'.encode('utf-8')) + # consume json messages + KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) + + # consume msgpack + KafkaConsumer(value_deserializer=msgpack.unpackb) + + # StopIteration if no message after 1sec + KafkaConsumer(consumer_timeout_ms=1000) + + # Subscribe to a regex topic pattern + consumer = KafkaConsumer() + consumer.subscribe(pattern='^awesome.*') + + # Use multiple consumers in parallel w/ 0.9 kafka brokers + # typically you would run each on a different server / process / CPU + consumer1 = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers='my.server.com') + consumer2 = KafkaConsumer('my-topic', + group_id='my-group', + bootstrap_servers='my.server.com') + + +There are many configuration options for the consumer class. See +:class:`~kafka.KafkaConsumer` API documentation for more details. + + +SimpleProducer +============== Asynchronous Mode ----------------- .. code:: python + from kafka import SimpleProducer, SimpleClient + # To send messages asynchronously - producer = SimpleProducer(kafka, async=True) - producer.send_messages(b'my-topic', b'async message') + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=True) + producer.send_messages('my-topic', b'async message') + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(client, + async=True, + batch_send_every_n=20, + batch_send_every_t=60) + +Synchronous Mode +---------------- + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To send messages synchronously + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=False) + + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages('my-topic', b'some message') + producer.send_messages('my-topic', b'this method', b'is variadic') + + # Send unicode message + producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8')) # To wait for acknowledgements # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to # a local log before sending response # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # by all in sync replicas before sending a response - producer = SimpleProducer(kafka, async=False, + producer = SimpleProducer(client, + async=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000, sync_fail_on_error=False) - responses = producer.send_messages(b'my-topic', b'another message') + responses = producer.send_messages('my-topic', b'another message') for r in responses: logging.info(r.offset) - # To send messages in batch. You can use any of the available - # producers for doing this. The following producer will collect - # messages in batch and send them to Kafka after 20 messages are - # collected or every 60 seconds - # Notes: - # * If the producer dies before the messages are sent, there will be losses - # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(kafka, async=True, - batch_send_every_n=20, - batch_send_every_t=60) -Keyed messages --------------- +KeyedProducer +============= .. code:: python from kafka import ( - KafkaClient, KeyedProducer, + SimpleClient, KeyedProducer, Murmur2Partitioner, RoundRobinPartitioner) - kafka = KafkaClient('localhost:9092') + kafka = SimpleClient('localhost:9092') # HashedPartitioner is default (currently uses python hash()) producer = KeyedProducer(kafka) @@ -74,131 +132,3 @@ Keyed messages # Or just produce round-robin (or just use SimpleProducer) producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) - - - -KafkaConsumer -------------- - -.. code:: python - - from kafka import KafkaConsumer - - # To consume messages - consumer = KafkaConsumer('my-topic', - group_id='my_group', - bootstrap_servers=['localhost:9092']) - for message in consumer: - # message value is raw byte string -- decode if necessary! - # e.g., for unicode: `message.value.decode('utf-8')` - print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, - message.offset, message.key, - message.value)) - - -messages (m) are namedtuples with attributes: - - * `m.topic`: topic name (str) - * `m.partition`: partition number (int) - * `m.offset`: message offset on topic-partition log (int) - * `m.key`: key (bytes - can be None) - * `m.value`: message (output of deserializer_class - default is raw bytes) - - -.. code:: python - - from kafka import KafkaConsumer - - # more advanced consumer -- multiple topics w/ auto commit offset - # management - consumer = KafkaConsumer('topic1', 'topic2', - bootstrap_servers=['localhost:9092'], - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in consumer: - do_some_work(m) - - # Mark this message as fully consumed - # so it can be included in the next commit - # - # **messages that are not marked w/ task_done currently do not commit! - consumer.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - consumer.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): - process_message(m) - consumer.task_done(m) - - - Configuration settings can be passed to constructor, - otherwise defaults will be used: - -.. code:: python - - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - bootstrap_servers=[], - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 - - Configuration parameters are described in more detail at - http://kafka.apache.org/documentation.html#highlevelconsumerapi - -Multiprocess consumer ---------------------- - -.. code:: python - - from kafka import KafkaClient, MultiProcessConsumer - - kafka = KafkaClient('localhost:9092') - - # This will split the number of partitions among two processes - consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', num_procs=2) - - # This will spawn processes such that each handles 2 partitions max - consumer = MultiProcessConsumer(kafka, b'my-group', b'my-topic', - partitions_per_proc=2) - - for message in consumer: - print(message) - - for message in consumer.get_messages(count=5, block=True, timeout=4): - print(message) - -Low level ---------- - -.. code:: python - - from kafka import KafkaClient, create_message - from kafka.protocol import KafkaProtocol - from kafka.common import ProduceRequest - - kafka = KafkaClient('localhost:9092') - - req = ProduceRequest(topic=b'my-topic', partition=1, - messages=[create_message(b'some message')]) - resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) - kafka.close() - - resps[0].topic # b'my-topic' - resps[0].partition # 1 - resps[0].error # 0 (hopefully) - resps[0].offset # offset of the first message sent in this request |