diff options
-rw-r--r-- | README.md | 105 | ||||
-rw-r--r-- | kafka/client.py | 2 | ||||
-rw-r--r-- | kafka/protocol.py | 2 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | test/integration.py | 4 |
5 files changed, 62 insertions, 53 deletions
@@ -1,12 +1,13 @@ -# Kakfa Python client +# Kafka Python client -This module provides low-level protocol support Apache Kafka. It implements the five basic request types -(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression -is also supported. +This module provides low-level protocol support for Apache Kafka as well as +high-level consumer and producer classes. Request batching is supported by the +protocol as well as broker-aware request routing. Gzip and Snappy compression +is also supported for message sets. -Compatible with Apache Kafka 0.7x. Tested against 0.8 +Compatible with Apache Kafka 0.8.0 -http://incubator.apache.org/kafka/ +http://kafka.apache.org/ # License @@ -14,7 +15,47 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE` # Status -Current version is 0.2-alpha. This version is under development, APIs are subject to change +I'm following the version numbers of Kafka, plus one number to indicate the +version of this project. The current version is 0.8.0-1. This version is under +development, APIs are subject to change. + +# Usage + +## High level + +```python +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +kafka = KafkaClient("localhost", 9092) + +producer = SimpleProducer(kafka, "my-topic") +producer.send_messages("some message") +producer.send_messages("this method", "is variadic") + +consumer = SimpleConsumer(kafka, "my-group", "my-topic") +for message in consumer: + print(message) + +kafka.close() +``` + +## Low level + +```python +from kafka.client import KafkaClient +kafka = KafkaClient("localhost", 9092) +req = ProduceRequest(topic="my-topic", partition=1, + messages=[KafkaProdocol.encode_message("some message")]) +resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) +kafka.close() + +resps[0].topic # "my-topic" +resps[0].partition # 1 +resps[0].error # 0 (hopefully) +resps[0].offset # offset of the first message sent in this request +``` # Install @@ -60,11 +101,14 @@ pip install python-snappy # Tests -Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures, -they might be bugs - so please report them! +Some of the tests will fail if Snappy is not installed. These tests will throw +NotImplementedError. If you see other failures, they might be bugs - so please +report them! ## Run the unit tests +_These are broken at the moment_ + ```shell python -m test.unit ``` @@ -81,46 +125,11 @@ cd kafka-src ./sbt package ``` -Then from the root directory, run the integration tests +Next start up a ZooKeeper server on localhost:2181 ```shell -python -m test.integration -``` - -# Usage - -## High level - -```python -from kafka.client import KafkaClient -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer - -kafka = KafkaClient("localhost", 9092) - -producer = SimpleProducer(kafka, "my-topic") -producer.send_messages("some message") -producer.send_messages("this method", "is variadic") - -consumer = SimpleConsumer(kafka, "my-group", "my-topic") -for message in consumer: - print(message) - -kafka.close() +/opt/zookeeper/bin/zkServer.sh start ``` -## Low level - -```python -from kafka.client import KafkaClient -kafka = KafkaClient("localhost", 9092) -req = ProduceRequest(topic="my-topic", partition=1, - messages=[KafkaProdocol.encode_message("some message")]) -resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) -kafka.close() - -resps[0].topic # "my-topic" -resps[0].partition # 1 -resps[0].error # 0 (hopefully) -resps[0].offset # offset of the first message sent in this request -``` +This will actually start up real Kafka brokers and send messages in using the +client. diff --git a/kafka/client.py b/kafka/client.py index 1c7fc93..5a58b2e 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -233,7 +233,7 @@ class KafkaClient(object): def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): raise NotImplementedError("Broker-managed offsets not supported in 0.8") resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_offset_commit_fetch, group=group), + partial(KafkaProtocol.encode_offset_fetch_request, group=group), KafkaProtocol.decode_offset_fetch_response) out = [] for resp in resps: diff --git a/kafka/protocol.py b/kafka/protocol.py index fc866ad..94a7f2a 100644 --- a/kafka/protocol.py +++ b/kafka/protocol.py @@ -354,7 +354,6 @@ class KafkaProtocol(object): ====== data: bytes to decode """ - data = data[2:] # TODO remove me when versionId is removed ((correlation_id,), cur) = relative_unpack('>i', data, 0) (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -398,7 +397,6 @@ class KafkaProtocol(object): data: bytes to decode """ - data = data[2:] # TODO remove me when versionId is removed ((correlation_id,), cur) = relative_unpack('>i', data, 0) (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -2,7 +2,7 @@ from distutils.core import setup setup( name="kafka-python", - version="0.2-alpha", + version="0.8.0-1", author="David Arthur", author_email="mumrah@gmail.com", url="https://github.com/mumrah/kafka-python", diff --git a/test/integration.py b/test/integration.py index a1fcce7..de91130 100644 --- a/test/integration.py +++ b/test/integration.py @@ -6,6 +6,7 @@ import shlex import shutil import socket import subprocess +import sys import tempfile from threading import Thread, Event import time @@ -73,7 +74,8 @@ class KafkaFixture(Thread): args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot)) proc = subprocess.Popen(args) ret = proc.wait() - assert ret == 0 + if ret != 0: + sys.exit(1) # Start Kafka |