diff options
author | Will Daly <will@edx.org> | 2015-01-04 12:23:10 -0500 |
---|---|---|
committer | Will Daly <will.e.daly@gmail.com> | 2015-01-15 18:01:40 -0500 |
commit | 01f378328e5383d05d52428b815f992eb2c536cb (patch) | |
tree | 2d3366ed91b9744efd40d935a460040150c6d4d8 | |
parent | 02c2b469003e2ddcb051dbb4d95977137050c19f (diff) | |
download | kafka-python-01f378328e5383d05d52428b815f992eb2c536cb.tar.gz |
Add Sphinx API docs
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | README.md | 232 | ||||
-rw-r--r-- | docs/Makefile | 177 | ||||
-rw-r--r-- | docs/api_reference.rst | 67 | ||||
-rw-r--r-- | docs/conf.py | 264 | ||||
-rw-r--r-- | docs/index.rst | 59 | ||||
-rw-r--r-- | docs/install.rst | 79 | ||||
-rw-r--r-- | docs/make.bat | 242 | ||||
-rw-r--r-- | docs/requirements.txt | 7 | ||||
-rw-r--r-- | docs/tests.rst | 59 | ||||
-rw-r--r-- | docs/usage.rst | 122 | ||||
-rw-r--r-- | kafka/client.py | 48 | ||||
-rw-r--r-- | kafka/conn.py | 23 | ||||
-rw-r--r-- | kafka/consumer/base.py | 10 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 226 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 39 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 67 | ||||
-rw-r--r-- | kafka/context.py | 5 | ||||
-rw-r--r-- | kafka/partitioner/base.py | 10 | ||||
-rw-r--r-- | kafka/producer/base.py | 20 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 22 | ||||
-rw-r--r-- | kafka/producer/simple.py | 30 | ||||
-rw-r--r-- | kafka/protocol.py | 125 | ||||
-rw-r--r-- | kafka/queue.py | 34 | ||||
-rw-r--r-- | kafka/util.py | 10 |
25 files changed, 1436 insertions, 542 deletions
@@ -8,3 +8,4 @@ env servers/*/kafka-bin .coverage .noseids +docs/_build @@ -2,6 +2,8 @@ [![Build Status](https://api.travis-ci.org/mumrah/kafka-python.png?branch=master)](https://travis-ci.org/mumrah/kafka-python) +[Full documentation available on ReadTheDocs](http://kafka-python.readthedocs.org/en/latest/) + This module provides low-level protocol support for Apache Kafka as well as high-level consumer and producer classes. Request batching is supported by the protocol as well as broker-aware request routing. Gzip and Snappy compression @@ -32,233 +34,3 @@ Python versions - 2.7 (tested on 2.7.8) - pypy (tested on pypy 2.3.1 / python 2.7.6) - (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) - -# Usage - -## High level - -```python -from kafka import KafkaClient, SimpleProducer, SimpleConsumer - -# To send messages synchronously -kafka = KafkaClient("localhost:9092") -producer = SimpleProducer(kafka) - -# Note that the application is responsible for encoding messages to type str -producer.send_messages("my-topic", "some message") -producer.send_messages("my-topic", "this method", "is variadic") - -# Send unicode message -producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) - -# To send messages asynchronously -# WARNING: current implementation does not guarantee message delivery on failure! -# messages can get dropped! Use at your own risk! Or help us improve with a PR! -producer = SimpleProducer(kafka, async=True) -producer.send_messages("my-topic", "async message") - -# To wait for acknowledgements -# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to -# a local log before sending response -# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed -# by all in sync replicas before sending a response -producer = SimpleProducer(kafka, async=False, - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000) - -response = producer.send_messages("my-topic", "another message") - -if response: - print(response[0].error) - print(response[0].offset) - -# To send messages in batch. You can use any of the available -# producers for doing this. The following producer will collect -# messages in batch and send them to Kafka after 20 messages are -# collected or every 60 seconds -# Notes: -# * If the producer dies before the messages are sent, there will be losses -# * Call producer.stop() to send the messages and cleanup -producer = SimpleProducer(kafka, batch_send=True, - batch_send_every_n=20, - batch_send_every_t=60) - -# To consume messages -consumer = SimpleConsumer(kafka, "my-group", "my-topic") -for message in consumer: - # message is raw byte string -- decode if necessary! - # e.g., for unicode: `message.decode('utf-8')` - print(message) - -kafka.close() -``` - -## Keyed messages -```python -from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner - -kafka = KafkaClient("localhost:9092") - -# HashedPartitioner is default -producer = KeyedProducer(kafka) -producer.send("my-topic", "key1", "some message") -producer.send("my-topic", "key2", "this methode") - -producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) -``` - -## Multiprocess consumer -```python -from kafka import KafkaClient, MultiProcessConsumer - -kafka = KafkaClient("localhost:9092") - -# This will split the number of partitions among two processes -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) - -# This will spawn processes such that each handles 2 partitions max -consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", - partitions_per_proc=2) - -for message in consumer: - print(message) - -for message in consumer.get_messages(count=5, block=True, timeout=4): - print(message) -``` - -## Low level - -```python -from kafka import KafkaClient, create_message -from kafka.protocol import KafkaProtocol -from kafka.common import ProduceRequest - -kafka = KafkaClient("localhost:9092") - -req = ProduceRequest(topic="my-topic", partition=1, - messages=[create_message("some message")]) -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) -kafka.close() - -resps[0].topic # "my-topic" -resps[0].partition # 1 -resps[0].error # 0 (hopefully) -resps[0].offset # offset of the first message sent in this request -``` - -# Install - -Install with your favorite package manager - -## Latest Release -Pip: - -```shell -pip install kafka-python -``` - -Releases are also listed at https://github.com/mumrah/kafka-python/releases - - -## Bleeding-Edge -```shell -git clone https://github.com/mumrah/kafka-python -pip install ./kafka-python -``` - -Setuptools: -```shell -git clone https://github.com/mumrah/kafka-python -easy_install ./kafka-python -``` - -Using `setup.py` directly: -```shell -git clone https://github.com/mumrah/kafka-python -cd kafka-python -python setup.py install -``` - -## Optional Snappy install - -### Install Development Libraries -Download and build Snappy from http://code.google.com/p/snappy/downloads/list - -Ubuntu: -```shell -apt-get install libsnappy-dev -``` - -OSX: -```shell -brew install snappy -``` - -From Source: -```shell -wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz -tar xzvf snappy-1.0.5.tar.gz -cd snappy-1.0.5 -./configure -make -sudo make install -``` - -### Install Python Module -Install the `python-snappy` module -```shell -pip install python-snappy -``` - -# Tests - -## Run the unit tests - -```shell -tox -``` - -## Run a subset of unit tests -```shell -# run protocol tests only -tox -- -v test.test_protocol -``` - -```shell -# test with pypy only -tox -e pypy -``` - -```shell -# Run only 1 test, and use python 2.7 -tox -e py27 -- -v --with-id --collect-only -# pick a test number from the list like #102 -tox -e py27 -- -v --with-id 102 -``` - -## Run the integration tests - -The integration tests will actually start up real local Zookeeper -instance and Kafka brokers, and send messages in using the client. - -First, get the kafka binaries for integration testing: -```shell -./build_integration.sh -``` -By default, the build_integration.sh script will download binary -distributions for all supported kafka versions. -To test against the latest source build, set KAFKA_VERSION=trunk -and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) -```shell -SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh -``` - -Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` -env variable to the server build you want to use for testing: -```shell -KAFKA_VERSION=0.8.0 tox -KAFKA_VERSION=0.8.1 tox -KAFKA_VERSION=0.8.1.1 tox -KAFKA_VERSION=trunk tox -``` diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..5751f68 --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make <target>' where <target> is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR)/* + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/kafka-python.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/kafka-python.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/kafka-python" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/kafka-python" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/docs/api_reference.rst b/docs/api_reference.rst new file mode 100644 index 0000000..5e178f1 --- /dev/null +++ b/docs/api_reference.rst @@ -0,0 +1,67 @@ +API Reference +============= + +kafka +----- +.. automodule:: kafka.client + :members: + +.. automodule:: kafka.codec + :members: + +.. automodule:: kafka.common + :members: + +.. automodule:: kafka.conn + :members: + +.. automodule:: kafka.context + :members: + +.. automodule:: kafka.protocol + :members: + +.. automodule:: kafka.queue + :members: + +.. automodule:: kafka.util + :members: + + +kafka.consumer +-------------- +.. automodule:: kafka.consumer.base + :members: + +.. automodule:: kafka.consumer.kafka + :members: + +.. automodule:: kafka.consumer.multiprocess + :members: + +.. automodule:: kafka.consumer.simple + :members: + + +kafka.partitioner +----------------- +.. automodule:: kafka.partitioner.base + :members: + +.. automodule:: kafka.partitioner.hashed + :members: + +.. automodule:: kafka.partitioner.roundrobin + :members: + + +kafka.producer +-------------- +.. automodule:: kafka.producer.base + :members: + +.. automodule:: kafka.producer.keyed + :members: + +.. automodule:: kafka.producer.simple + :members: diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 0000000..9e95f79 --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,264 @@ +# -*- coding: utf-8 -*- +# +# kafka-python documentation build configuration file, created by +# sphinx-quickstart on Sun Jan 4 12:21:50 2015. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.viewcode', + 'sphinxcontrib.napoleon', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'kafka-python' +copyright = u'2015, David Arthur' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +with open('../VERSION') as version_file: + version = version_file.read() + +# The full version, including alpha/beta/rc tags. +release = version + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +#keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# "<project> v<release> documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +#html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a <link> tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'kafka-pythondoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { +# The paper size ('letterpaper' or 'a4paper'). +#'papersize': 'letterpaper', + +# The font size ('10pt', '11pt' or '12pt'). +#'pointsize': '10pt', + +# Additional stuff for the LaTeX preamble. +#'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'kafka-python.tex', u'kafka-python Documentation', + u'David Arthur', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'kafka-python', u'kafka-python Documentation', + [u'David Arthur'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'kafka-python', u'kafka-python Documentation', + u'David Arthur', 'kafka-python', 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 0000000..82c3f57 --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,59 @@ + +kafka-python +============ + +This module provides low-level protocol support for Apache Kafka as well as +high-level consumer and producer classes. Request batching is supported by the +protocol as well as broker-aware request routing. Gzip and Snappy compression +is also supported for message sets. + +http://kafka.apache.org/ + +On Freenode IRC at #kafka-python, as well as #apache-kafka + +For general discussion of kafka-client design and implementation (not python specific), +see https://groups.google.com/forum/m/#!forum/kafka-clients + +Status +------ + +The current stable version of this package is `0.9.2 <https://github.com/mumrah/kafka-python/releases/tag/v0.9.2>`_ and is compatible with: + +Kafka broker versions + +* 0.8.0 +* 0.8.1 +* 0.8.1.1 + +Python versions + +* 2.6 (tested on 2.6.9) +* 2.7 (tested on 2.7.8) +* pypy (tested on pypy 2.3.1 / python 2.7.6) +* (Python 3.3 and 3.4 support has been added to trunk and will be available the next release) + +License +------- + +Copyright 2014, David Arthur under Apache License, v2.0. See `LICENSE <https://github.com/mumrah/kafka-python/blob/master/LICENSE>`_. + + +Contents +-------- + +.. toctree:: + :maxdepth: 2 + + install + tests + usage + api_reference + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` + diff --git a/docs/install.rst b/docs/install.rst new file mode 100644 index 0000000..1dd6d4e --- /dev/null +++ b/docs/install.rst @@ -0,0 +1,79 @@ +Install +======= + +Install with your favorite package manager + +Latest Release +-------------- +Pip: + +.. code:: bash + + pip install kafka-python + +Releases are also listed at https://github.com/mumrah/kafka-python/releases + + +Bleeding-Edge +------------- + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + pip install ./kafka-python + +Setuptools: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + easy_install ./kafka-python + +Using `setup.py` directly: + +.. code:: bash + + git clone https://github.com/mumrah/kafka-python + cd kafka-python + python setup.py install + + +Optional Snappy install +----------------------- + +Install Development Libraries +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Download and build Snappy from http://code.google.com/p/snappy/downloads/list + +Ubuntu: + +.. code:: bash + + apt-get install libsnappy-dev + +OSX: + +.. code:: bash + + brew install snappy + +From Source: + +.. code:: bash + + wget http://snappy.googlecode.com/files/snappy-1.0.5.tar.gz + tar xzvf snappy-1.0.5.tar.gz + cd snappy-1.0.5 + ./configure + make + sudo make install + +Install Python Module +^^^^^^^^^^^^^^^^^^^^^ + +Install the `python-snappy` module + +.. code:: bash + + pip install python-snappy diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..2e9d7dc --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,242 @@ +@ECHO OFF
+
+REM Command file for Sphinx documentation
+
+if "%SPHINXBUILD%" == "" (
+ set SPHINXBUILD=sphinx-build
+)
+set BUILDDIR=_build
+set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% .
+set I18NSPHINXOPTS=%SPHINXOPTS% .
+if NOT "%PAPER%" == "" (
+ set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS%
+ set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS%
+)
+
+if "%1" == "" goto help
+
+if "%1" == "help" (
+ :help
+ echo.Please use `make ^<target^>` where ^<target^> is one of
+ echo. html to make standalone HTML files
+ echo. dirhtml to make HTML files named index.html in directories
+ echo. singlehtml to make a single large HTML file
+ echo. pickle to make pickle files
+ echo. json to make JSON files
+ echo. htmlhelp to make HTML files and a HTML help project
+ echo. qthelp to make HTML files and a qthelp project
+ echo. devhelp to make HTML files and a Devhelp project
+ echo. epub to make an epub
+ echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter
+ echo. text to make text files
+ echo. man to make manual pages
+ echo. texinfo to make Texinfo files
+ echo. gettext to make PO message catalogs
+ echo. changes to make an overview over all changed/added/deprecated items
+ echo. xml to make Docutils-native XML files
+ echo. pseudoxml to make pseudoxml-XML files for display purposes
+ echo. linkcheck to check all external links for integrity
+ echo. doctest to run all doctests embedded in the documentation if enabled
+ goto end
+)
+
+if "%1" == "clean" (
+ for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i
+ del /q /s %BUILDDIR%\*
+ goto end
+)
+
+
+%SPHINXBUILD% 2> nul
+if errorlevel 9009 (
+ echo.
+ echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
+ echo.installed, then set the SPHINXBUILD environment variable to point
+ echo.to the full path of the 'sphinx-build' executable. Alternatively you
+ echo.may add the Sphinx directory to PATH.
+ echo.
+ echo.If you don't have Sphinx installed, grab it from
+ echo.http://sphinx-doc.org/
+ exit /b 1
+)
+
+if "%1" == "html" (
+ %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/html.
+ goto end
+)
+
+if "%1" == "dirhtml" (
+ %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml.
+ goto end
+)
+
+if "%1" == "singlehtml" (
+ %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml.
+ goto end
+)
+
+if "%1" == "pickle" (
+ %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the pickle files.
+ goto end
+)
+
+if "%1" == "json" (
+ %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can process the JSON files.
+ goto end
+)
+
+if "%1" == "htmlhelp" (
+ %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run HTML Help Workshop with the ^
+.hhp project file in %BUILDDIR%/htmlhelp.
+ goto end
+)
+
+if "%1" == "qthelp" (
+ %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; now you can run "qcollectiongenerator" with the ^
+.qhcp project file in %BUILDDIR%/qthelp, like this:
+ echo.^> qcollectiongenerator %BUILDDIR%\qthelp\kafka-python.qhcp
+ echo.To view the help file:
+ echo.^> assistant -collectionFile %BUILDDIR%\qthelp\kafka-python.ghc
+ goto end
+)
+
+if "%1" == "devhelp" (
+ %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished.
+ goto end
+)
+
+if "%1" == "epub" (
+ %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The epub file is in %BUILDDIR%/epub.
+ goto end
+)
+
+if "%1" == "latex" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished; the LaTeX files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdf" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "latexpdfja" (
+ %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex
+ cd %BUILDDIR%/latex
+ make all-pdf-ja
+ cd %BUILDDIR%/..
+ echo.
+ echo.Build finished; the PDF files are in %BUILDDIR%/latex.
+ goto end
+)
+
+if "%1" == "text" (
+ %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The text files are in %BUILDDIR%/text.
+ goto end
+)
+
+if "%1" == "man" (
+ %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The manual pages are in %BUILDDIR%/man.
+ goto end
+)
+
+if "%1" == "texinfo" (
+ %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo.
+ goto end
+)
+
+if "%1" == "gettext" (
+ %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The message catalogs are in %BUILDDIR%/locale.
+ goto end
+)
+
+if "%1" == "changes" (
+ %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.The overview file is in %BUILDDIR%/changes.
+ goto end
+)
+
+if "%1" == "linkcheck" (
+ %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Link check complete; look for any errors in the above output ^
+or in %BUILDDIR%/linkcheck/output.txt.
+ goto end
+)
+
+if "%1" == "doctest" (
+ %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Testing of doctests in the sources finished, look at the ^
+results in %BUILDDIR%/doctest/output.txt.
+ goto end
+)
+
+if "%1" == "xml" (
+ %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The XML files are in %BUILDDIR%/xml.
+ goto end
+)
+
+if "%1" == "pseudoxml" (
+ %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml
+ if errorlevel 1 exit /b 1
+ echo.
+ echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml.
+ goto end
+)
+
+:end
diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..94c50d8 --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1,7 @@ +sphinx +sphinxcontrib-napoleon + +# Install kafka-python in editable mode +# This allows the sphinx autodoc module +# to load the Python modules and extract docstrings. +-e .. diff --git a/docs/tests.rst b/docs/tests.rst new file mode 100644 index 0000000..df9a3ef --- /dev/null +++ b/docs/tests.rst @@ -0,0 +1,59 @@ +Tests +===== + +Run the unit tests +------------------ + +.. code:: bash + + tox + + +Run a subset of unit tests +-------------------------- + +.. code:: bash + + # run protocol tests only + tox -- -v test.test_protocol + + # test with pypy only + tox -e pypy + + # Run only 1 test, and use python 2.7 + tox -e py27 -- -v --with-id --collect-only + + # pick a test number from the list like #102 + tox -e py27 -- -v --with-id 102 + + +Run the integration tests +------------------------- + +The integration tests will actually start up real local Zookeeper +instance and Kafka brokers, and send messages in using the client. + +First, get the kafka binaries for integration testing: + +.. code:: bash + + ./build_integration.sh + +By default, the build_integration.sh script will download binary +distributions for all supported kafka versions. +To test against the latest source build, set KAFKA_VERSION=trunk +and optionally set SCALA_VERSION (defaults to 2.8.0, but 2.10.1 is recommended) + +.. code:: bash + + SCALA_VERSION=2.10.1 KAFKA_VERSION=trunk ./build_integration.sh + +Then run the tests against supported Kafka versions, simply set the `KAFKA_VERSION` +env variable to the server build you want to use for testing: + +.. code:: bash + + KAFKA_VERSION=0.8.0 tox + KAFKA_VERSION=0.8.1 tox + KAFKA_VERSION=0.8.1.1 tox + KAFKA_VERSION=trunk tox diff --git a/docs/usage.rst b/docs/usage.rst new file mode 100644 index 0000000..5f3fcea --- /dev/null +++ b/docs/usage.rst @@ -0,0 +1,122 @@ +Usage +===== + +High level +---------- + +.. code:: python + + from kafka import KafkaClient, SimpleProducer, SimpleConsumer + + # To send messages synchronously + kafka = KafkaClient("localhost:9092") + producer = SimpleProducer(kafka) + + # Note that the application is responsible for encoding messages to type str + producer.send_messages("my-topic", "some message") + producer.send_messages("my-topic", "this method", "is variadic") + + # Send unicode message + producer.send_messages("my-topic", u'你怎么样?'.encode('utf-8')) + + # To send messages asynchronously + # WARNING: current implementation does not guarantee message delivery on failure! + # messages can get dropped! Use at your own risk! Or help us improve with a PR! + producer = SimpleProducer(kafka, async=True) + producer.send_messages("my-topic", "async message") + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(kafka, async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000) + + response = producer.send_messages("my-topic", "another message") + + if response: + print(response[0].error) + print(response[0].offset) + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(kafka, batch_send=True, + batch_send_every_n=20, + batch_send_every_t=60) + + # To consume messages + consumer = SimpleConsumer(kafka, "my-group", "my-topic") + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + kafka.close() + + +Keyed messages +-------------- + +.. code:: python + + from kafka import KafkaClient, KeyedProducer, HashedPartitioner, RoundRobinPartitioner + + kafka = KafkaClient("localhost:9092") + + # HashedPartitioner is default + producer = KeyedProducer(kafka) + producer.send("my-topic", "key1", "some message") + producer.send("my-topic", "key2", "this methode") + + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +Multiprocess consumer +--------------------- + +.. code:: python + + from kafka import KafkaClient, MultiProcessConsumer + + kafka = KafkaClient("localhost:9092") + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + +Low level +--------- + +.. code:: python + + from kafka import KafkaClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = KafkaClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request diff --git a/kafka/client.py b/kafka/client.py index bc3d853..6d1b9f8 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -131,19 +131,21 @@ class KafkaClient(object): the leader broker for that partition using the supplied encode/decode functions - Params - ====== + Arguments: + payloads: list of object-like entities with a topic (str) and - partition (int) attribute + partition (int) attribute + encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes + The response objects must be object-like and have topic + and partition attributes + + Returns: - Return - ====== List of response objects in the same order as the supplied payloads """ @@ -285,9 +287,9 @@ class KafkaClient(object): This method should be called after receiving any error - @param: *topics (optional) - If a list of topics is provided, the metadata refresh will be limited - to the specified topics only. + Arguments: + *topics (optional): If a list of topics is provided, + the metadata refresh will be limited to the specified topics only. Exceptions: ---------- @@ -379,18 +381,16 @@ class KafkaClient(object): sent to a specific broker. Output is a list of responses in the same order as the list of payloads specified - Params - ====== - payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we - encounter an API error? - callback: function, instead of returning the ProduceResponse, - first pass it through this function - - Return - ====== - list of ProduceResponse or callback(ProduceResponse), in the - order of input payloads + Arguments: + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function + + Returns: + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ encoder = functools.partial( diff --git a/kafka/conn.py b/kafka/conn.py index ddfee8b..30debec 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -47,10 +47,11 @@ class KafkaConnection(local): we can do something in here to facilitate multiplexed requests/responses since the Kafka API includes a correlation id. - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. + Arguments: + host: the host name or IP address of a kafka broker + port: the port number the kafka broker is listening on + timeout: default 120. The socket timeout for sending and receiving data + in seconds. None means no timeout, so a request can block forever. """ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() @@ -116,8 +117,10 @@ class KafkaConnection(local): def send(self, request_id, payload): """ Send a request to Kafka - param: request_id -- can be any int (used only for debug logging...) - param: payload -- an encoded kafka packet (see KafkaProtocol) + + Arguments:: + request_id (int): can be any int (used only for debug logging...) + payload: an encoded kafka packet (see KafkaProtocol) """ log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) @@ -135,8 +138,12 @@ class KafkaConnection(local): def recv(self, request_id): """ Get a response packet from Kafka - param: request_id -- can be any int (only used for debug logging...) - returns encoded kafka packet response from server as type str + + Arguments: + request_id: can be any int (only used for debug logging...) + + Returns: + str: Encoded kafka packet response from server """ log.debug("Reading response %d from Kafka" % request_id) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 2464aaf..9cdcf89 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -32,9 +32,11 @@ class Consumer(object): Base class to be used by other consumers. Not to be used directly This base class provides logic for + * initialization and fetching metadata of partitions * Auto-commit logic * APIs for fetching pending message count + """ def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, @@ -93,8 +95,9 @@ class Consumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit - all of them + Keyword Arguments: + partitions (list): list of partitions to commit, default is to commit + all of them """ # short circuit if nothing happened. This check is kept outside @@ -148,7 +151,8 @@ class Consumer(object): """ Gets the pending message count - partitions: list of partitions to check for, default is to check all + Keyword Arguments: + partitions (list): list of partitions to check for, default is to check all """ if not partitions: partitions = self.offsets.keys() diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index f16b526..ae0f0b9 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -54,72 +54,78 @@ class KafkaConsumer(object): """ A simpler kafka consumer - ``` - # A very basic 'tail' consumer, with no stored offset management - kafka = KafkaConsumer('topic1') - for m in kafka: - print m - - # Alternate interface: next() - print kafka.next() - - # Alternate interface: batch iteration - while True: - for m in kafka.fetch_messages(): - print m - print "Done with batch - let's do another!" - ``` - - ``` - # more advanced consumer -- multiple topics w/ auto commit offset management - kafka = KafkaConsumer('topic1', 'topic2', - group_id='my_consumer_group', - auto_commit_enable=True, - auto_commit_interval_ms=30 * 1000, - auto_offset_reset='smallest') - - # Infinite iteration - for m in kafka: - process_message(m) - kafka.task_done(m) - - # Alternate interface: next() - m = kafka.next() - process_message(m) - kafka.task_done(m) - - # If auto_commit_enable is False, remember to commit() periodically - kafka.commit() - - # Batch process interface - while True: - for m in kafka.fetch_messages(): + .. code:: python + + # A very basic 'tail' consumer, with no stored offset management + kafka = KafkaConsumer('topic1') + for m in kafka: + print m + + # Alternate interface: next() + print kafka.next() + + # Alternate interface: batch iteration + while True: + for m in kafka.fetch_messages(): + print m + print "Done with batch - let's do another!" + + + .. code:: python + + # more advanced consumer -- multiple topics w/ auto commit offset management + kafka = KafkaConsumer('topic1', 'topic2', + group_id='my_consumer_group', + auto_commit_enable=True, + auto_commit_interval_ms=30 * 1000, + auto_offset_reset='smallest') + + # Infinite iteration + for m in kafka: + process_message(m) + kafka.task_done(m) + + # Alternate interface: next() + m = kafka.next() process_message(m) kafka.task_done(m) - ``` + + # If auto_commit_enable is False, remember to commit() periodically + kafka.commit() + + # Batch process interface + while True: + for m in kafka.fetch_messages(): + process_message(m) + kafka.task_done(m) + messages (m) are namedtuples with attributes: - m.topic: topic name (str) - m.partition: partition number (int) - m.offset: message offset on topic-partition log (int) - m.key: key (bytes - can be None) - m.value: message (output of deserializer_class - default is raw bytes) + + * `m.topic`: topic name (str) + * `m.partition`: partition number (int) + * `m.offset`: message offset on topic-partition log (int) + * `m.key`: key (bytes - can be None) + * `m.value`: message (output of deserializer_class - default is raw bytes) Configuration settings can be passed to constructor, otherwise defaults will be used: - client_id='kafka.consumer.kafka', - group_id=None, - fetch_message_max_bytes=1024*1024, - fetch_min_bytes=1, - fetch_wait_max_ms=100, - refresh_leader_backoff_ms=200, - metadata_broker_list=None, - socket_timeout_ms=30*1000, - auto_offset_reset='largest', - deserializer_class=lambda msg: msg, - auto_commit_enable=False, - auto_commit_interval_ms=60 * 1000, - consumer_timeout_ms=-1 + + .. code:: python + + client_id='kafka.consumer.kafka', + group_id=None, + fetch_message_max_bytes=1024*1024, + fetch_min_bytes=1, + fetch_wait_max_ms=100, + refresh_leader_backoff_ms=200, + metadata_broker_list=None, + socket_timeout_ms=30*1000, + auto_offset_reset='largest', + deserializer_class=lambda msg: msg, + auto_commit_enable=False, + auto_commit_interval_ms=60 * 1000, + consumer_timeout_ms=-1 Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi @@ -133,6 +139,9 @@ class KafkaConsumer(object): """ Configuration settings can be passed to constructor, otherwise defaults will be used: + + .. code:: python + client_id='kafka.consumer.kafka', group_id=None, fetch_message_max_bytes=1024*1024, @@ -189,28 +198,35 @@ class KafkaConsumer(object): Optionally specify offsets to start from Accepts types: - str (utf-8): topic name (will consume all available partitions) - tuple: (topic, partition) - dict: { topic: partition } - { topic: [partition list] } - { topic: (partition tuple,) } + + * str (utf-8): topic name (will consume all available partitions) + * tuple: (topic, partition) + * dict: + - { topic: partition } + - { topic: [partition list] } + - { topic: (partition tuple,) } Optionally, offsets can be specified directly: - tuple: (topic, partition, offset) - dict: { (topic, partition): offset, ... } - Ex: - kafka = KafkaConsumer() + * tuple: (topic, partition, offset) + * dict: { (topic, partition): offset, ... } + + Example: + + .. code:: python + + kafka = KafkaConsumer() - # Consume topic1-all; topic2-partition2; topic3-partition0 - kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) + # Consume topic1-all; topic2-partition2; topic3-partition0 + kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0}) - # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 - # using tuples -- - kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + # Consume topic1-0 starting at offset 123, and topic2-1 at offset 456 + # using tuples -- + kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456)) + + # using dict -- + kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) - # using dict -- - kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 }) """ self._topics = [] self._client.load_metadata_for_topics() @@ -309,10 +325,12 @@ class KafkaConsumer(object): Otherwise blocks indefinitely Note that this is also the method called internally during iteration: - ``` - for m in consumer: - pass - ``` + + .. code:: python + + for m in consumer: + pass + """ self._set_consumer_timeout_start() while True: @@ -336,11 +354,12 @@ class KafkaConsumer(object): OffsetOutOfRange, per the configured `auto_offset_reset` policy Key configuration parameters: - `fetch_message_max_bytes` - `fetch_max_wait_ms` - `fetch_min_bytes` - `deserializer_class` - `auto_offset_reset` + + * `fetch_message_max_bytes` + * `fetch_max_wait_ms` + * `fetch_min_bytes` + * `deserializer_class` + * `auto_offset_reset` """ max_bytes = self._config['fetch_message_max_bytes'] @@ -418,20 +437,18 @@ class KafkaConsumer(object): """ Request available fetch offsets for a single topic/partition - @param topic (str) - @param partition (int) - @param request_time_ms (int) -- Used to ask for all messages before a - certain time (ms). There are two special - values. Specify -1 to receive the latest - offset (i.e. the offset of the next coming - message) and -2 to receive the earliest - available offset. Note that because offsets - are pulled in descending order, asking for - the earliest offset will always return you - a single element. - @param max_num_offsets (int) - - @return offsets (list) + Arguments: + topic (str) + partition (int) + request_time_ms (int): Used to ask for all messages before a + certain time (ms). There are two special values. Specify -1 to receive the latest + offset (i.e. the offset of the next coming message) and -2 to receive the earliest + available offset. Note that because offsets are pulled in descending order, asking for + the earliest offset will always return you a single element. + max_num_offsets (int) + + Returns: + offsets (list) """ reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)] @@ -448,9 +465,12 @@ class KafkaConsumer(object): def offsets(self, group=None): """ - Returns a copy of internal offsets struct - optional param: group [fetch|commit|task_done|highwater] - if no group specified, returns all groups + Keyword Arguments: + group: Either "fetch", "commit", "task_done", or "highwater". + If no group specified, returns all groups. + + Returns: + A copy of internal offsets struct """ if not group: return { @@ -498,8 +518,8 @@ class KafkaConsumer(object): Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group. - Note -- this functionality requires server version >=0.8.1.1 - see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI + **Note**: this functionality requires server version >=0.8.1.1 + See `this wiki page <https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI>`_. """ if not self._config['group_id']: logger.warning('Cannot commit without a group_id!') diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 912e64b..4dc04dc 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -80,19 +80,21 @@ class MultiProcessConsumer(Consumer): A consumer implementation that consumes partitions for a topic in parallel using multiple processes - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + auto_commit: default True. Whether or not to auto commit the offsets + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + num_procs: Number of processes to start for consuming messages. + The available partitions will be divided among these processes + partitions_per_proc: Number of partitions to be allocated per process + (overrides num_procs) Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -198,11 +200,12 @@ class MultiProcessConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..000fcd9 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -67,24 +67,32 @@ class SimpleConsumer(Consumer): A simple consumer implementation that consumes all/specified partitions for a topic - client: a connected KafkaClient - group: a name for this consumer, used for offset storage and must be unique - topic: the topic to consume - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. + Arguments: + client: a connected KafkaClient + group: a name for this consumer, used for offset storage and must be unique + topic: the topic to consume + + Keyword Arguments: + partitions: An optional list of partitions to consume the data from + + auto_commit: default True. Whether or not to auto commit the offsets + + auto_commit_every_n: default 100. How many messages to consume + before a commit + + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit + fetch_size_bytes: number of bytes to request in a FetchRequest + + buffer_size: default 4K. Initial number of bytes to tell kafka we + have available. This will double as needed. + + max_buffer_size: default 16K. Max number of bytes to tell kafka we have + available. None means no limit. + + iter_timeout: default None. How much time (in seconds) to wait for a + message in the iterator before exiting. None means no + timeout, so it will wait forever. Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will @@ -133,11 +141,13 @@ class SimpleConsumer(Consumer): """ Alter the current offset in the consumer, similar to fseek - offset: how much to modify the offset - whence: where to modify it from - 0 is relative to the earliest available offset (head) - 1 is relative to the current offset - 2 is relative to the latest known offset (tail) + Arguments: + offset: how much to modify the offset + whence: where to modify it from + + * 0 is relative to the earliest available offset (head) + * 1 is relative to the current offset + * 2 is relative to the latest known offset (tail) """ if whence == 1: # relative to current position @@ -180,11 +190,12 @@ class SimpleConsumer(Consumer): """ Fetch the specified number of messages - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till some messages are fetched. - timeout: If block is True, the function will block for the specified - time (in seconds) until count messages is fetched. If None, - it will block forever. + Keyword Arguments: + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If block is True, the function will block for the specified + time (in seconds) until count messages is fetched. If None, + it will block forever. """ messages = [] if timeout is not None: diff --git a/kafka/context.py b/kafka/context.py index 98ed7b3..ade4db8 100644 --- a/kafka/context.py +++ b/kafka/context.py @@ -18,6 +18,8 @@ class OffsetCommitContext(object): Example: + .. code:: python + consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() consumer.fetch_last_known_offsets() @@ -57,7 +59,10 @@ class OffsetCommitContext(object): In order to know the current partition, it is helpful to initialize the consumer to provide partition info via: + .. code:: python + consumer.provide_partition_info() + """ max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py index c62b7ed..0b1bb59 100644 --- a/kafka/partitioner/base.py +++ b/kafka/partitioner/base.py @@ -7,7 +7,8 @@ class Partitioner(object): """ Initialize the partitioner - partitions - A list of available partitions (during startup) + Arguments: + partitions: A list of available partitions (during startup) """ self.partitions = partitions @@ -16,8 +17,9 @@ class Partitioner(object): Takes a string key and num_partitions as argument and returns a partition to be used for the message - partitions - The list of partitions is passed in every call. This - may look like an overhead, but it will be useful - (in future) when we handle cases like rebalancing + Arguments: + partitions: The list of partitions is passed in every call. This + may look like an overhead, but it will be useful + (in future) when we handle cases like rebalancing """ raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92..5b41bc9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -85,20 +85,20 @@ class Producer(object): """ Base class to be used by producers - Params: - client - The Kafka client instance to use - async - If set to true, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + async: If set to true, the messages are sent asynchronously via another thread (process). We will not wait for a response to these WARNING!!! current implementation of async producer does not guarantee message delivery. Use at your own risk! Or help us improve with a PR! - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ ACK_NOT_REQUIRED = 0 # No ack is required diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 68c70d9..fe5b056 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -15,17 +15,19 @@ class KeyedProducer(Producer): """ A producer which distributes messages to partitions based on the key - Args: - client - The kafka client instance - partitioner - A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner - async - If True, the messages are sent asynchronously via another + Arguments: + client: The kafka client instance + + Keyword Arguments: + partitioner: A partitioner class that will be used to get the partition + to send the message to. Must be derived from Partitioner + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout """ def __init__(self, client, partitioner=None, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index a10fa8c..019395c 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -19,21 +19,23 @@ class SimpleProducer(Producer): """ A simple, round-robin producer. Each message goes to exactly one partition - Params: - client - The Kafka client instance to use - async - If True, the messages are sent asynchronously via another + Arguments: + client: The Kafka client instance to use + + Keyword Arguments: + async: If True, the messages are sent asynchronously via another thread (process). We will not wait for a response to these - req_acks - A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout - Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send - If True, messages are send in batches - batch_send_every_n - If set, messages are send in batches of this size - batch_send_every_t - If set, messages are send after this timeout - random_start - If true, randomize the initial partition which the - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition + req_acks: A value indicating the acknowledgements that the server must + receive before responding to the request + ack_timeout: Value (in milliseconds) indicating a timeout for waiting + for an acknowledgement + batch_send: If True, messages are send in batches + batch_send_every_n: If set, messages are send in batches of this size + batch_send_every_t: If set, messages are send after this timeout + random_start: If true, randomize the initial partition which the + the first message block will be published to, otherwise + if false, the first message block will always publish + to partition 0 before cycling through each partition """ def __init__(self, client, async=False, req_acks=Producer.ACK_AFTER_LOCAL_WRITE, diff --git a/kafka/protocol.py b/kafka/protocol.py index a85c7eb..2a39de6 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -185,18 +185,18 @@ class KafkaProtocol(object): """ Encode some ProduceRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of ProduceRequest - acks: How "acky" you want the request to be - 0: immediate response - 1: written to disk by the leader - 2+: waits for this many number of replicas to sync - -1: waits for all replicas to be in sync - timeout: Maximum time the server will wait for acks from replicas. - This is _not_ a socket timeout + Arguments: + client_id: string + correlation_id: int + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. + This is _not_ a socket timeout + """ payloads = [] if payloads is None else payloads grouped_payloads = group_by_topic_and_partition(payloads) @@ -225,9 +225,9 @@ class KafkaProtocol(object): """ Decode bytes to a ProduceResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode + """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -248,14 +248,13 @@ class KafkaProtocol(object): """ Encodes some FetchRequest structs - Params - ====== - client_id: string - correlation_id: int - payloads: list of FetchRequest - max_wait_time: int, how long to block waiting on min_bytes of data - min_bytes: int, the minimum number of bytes to accumulate before - returning the response + Arguments: + client_id: string + correlation_id: int + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before + returning the response """ payloads = [] if payloads is None else payloads @@ -284,9 +283,8 @@ class KafkaProtocol(object): """ Decode bytes to a FetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -333,9 +331,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) @@ -360,11 +357,10 @@ class KafkaProtocol(object): """ Encode a MetadataRequest - Params - ====== - client_id: string - correlation_id: int - topics: list of strings + Arguments: + client_id: string + correlation_id: int + topics: list of strings """ if payloads is None: topics = [] if topics is None else topics @@ -388,9 +384,8 @@ class KafkaProtocol(object): """ Decode bytes to a MetadataResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id, numbrokers), cur) = relative_unpack('>ii', data, 0) @@ -439,12 +434,11 @@ class KafkaProtocol(object): """ Encode some OffsetCommitRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -470,9 +464,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetCommitResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -491,12 +484,11 @@ class KafkaProtocol(object): """ Encode some OffsetFetchRequest structs - Params - ====== - client_id: string - correlation_id: int - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequest + Arguments: + client_id: string + correlation_id: int + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest """ grouped_payloads = group_by_topic_and_partition(payloads) @@ -522,9 +514,8 @@ class KafkaProtocol(object): """ Decode bytes to an OffsetFetchResponse - Params - ====== - data: bytes to decode + Arguments: + data: bytes to decode """ ((correlation_id,), cur) = relative_unpack('>i', data, 0) @@ -547,10 +538,10 @@ def create_message(payload, key=None): """ Construct a Message - Params - ====== - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ return Message(0, 0, key, payload) @@ -562,10 +553,10 @@ def create_gzip_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) @@ -583,10 +574,10 @@ def create_snappy_message(payloads, key=None): The given payloads will be encoded, compressed, and sent as a single atomic message to Kafka. - Params - ====== - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ message_set = KafkaProtocol._encode_message_set( [create_message(payload, key) for payload in payloads]) diff --git a/kafka/queue.py b/kafka/queue.py index ada495f..26cafad 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -129,13 +129,12 @@ class KafkaQueue(object): Messages are buffered in the producer thread until producer_flush_timeout or producer_flush_buffer is reached. - Params - ====== - client: KafkaClient object - topic: str, the topic name - partitions: list of ints, the partions to consume from - producer_config: dict, see below - consumer_config: dict, see below + Arguments: + client: KafkaClient object + topic: str, the topic name + partitions: list of ints, the partions to consume from + producer_config: dict, see below + consumer_config: dict, see below Consumer Config =============== @@ -184,14 +183,12 @@ class KafkaQueue(object): """ Consume a message from Kafka - Params - ====== - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None + Arguments: + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None - Returns - ======= - msg: str, the payload from Kafka + Returns: + msg: str, the payload from Kafka """ return self.in_queue.get(block, timeout).payload @@ -199,11 +196,10 @@ class KafkaQueue(object): """ Send a message to Kafka - Params - ====== - msg: std, the message to send - block: boolean, default True - timeout: int, number of seconds to wait when blocking, default None + Arguments: + msg: std, the message to send + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None """ self.out_queue.put(msg, block, timeout) diff --git a/kafka/util.py b/kafka/util.py index 72ac521..622b1a7 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -103,10 +103,12 @@ class ReentrantTimer(object): A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function + Arguments: + + t: timer interval in milliseconds + fn: a callable to invoke + args: tuple of args to be passed to function + kwargs: keyword arguments to be passed to function """ def __init__(self, t, fn, *args, **kwargs): |