summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-04-02 20:38:55 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:38:55 -0400
commit7c604697be54aac6989d7e8b7fb6f8e41699ade9 (patch)
tree2800a085a224fd273c00c2404424b014ec74faed
parent53da81c1761764aec4bcee49cdd3464c925fed09 (diff)
parentbff6cae885e9750b468d475dac3ec712bf69e853 (diff)
downloadkafka-python-0.8.tar.gz
Merge branch 'master' into 0.80.8
Conflicts: README.md kafka-src kafka/client.py kafka/consumer.py kafka/protocol.py setup.py test/integration.py
-rw-r--r--README.md105
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/protocol.py2
-rw-r--r--setup.py2
-rw-r--r--test/integration.py4
5 files changed, 62 insertions, 53 deletions
diff --git a/README.md b/README.md
index cf0ebe3..08fbd67 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,13 @@
-# Kakfa Python client
+# Kafka Python client
-This module provides low-level protocol support Apache Kafka. It implements the five basic request types
-(and their responses): Produce, Fetch, MultiFetch, MultiProduce, and Offsets. Gzip and Snappy compression
-is also supported.
+This module provides low-level protocol support for Apache Kafka as well as
+high-level consumer and producer classes. Request batching is supported by the
+protocol as well as broker-aware request routing. Gzip and Snappy compression
+is also supported for message sets.
-Compatible with Apache Kafka 0.7x. Tested against 0.8
+Compatible with Apache Kafka 0.8.0
-http://incubator.apache.org/kafka/
+http://kafka.apache.org/
# License
@@ -14,7 +15,47 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status
-Current version is 0.2-alpha. This version is under development, APIs are subject to change
+I'm following the version numbers of Kafka, plus one number to indicate the
+version of this project. The current version is 0.8.0-1. This version is under
+development, APIs are subject to change.
+
+# Usage
+
+## High level
+
+```python
+from kafka.client import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.producer import SimpleProducer
+
+kafka = KafkaClient("localhost", 9092)
+
+producer = SimpleProducer(kafka, "my-topic")
+producer.send_messages("some message")
+producer.send_messages("this method", "is variadic")
+
+consumer = SimpleConsumer(kafka, "my-group", "my-topic")
+for message in consumer:
+ print(message)
+
+kafka.close()
+```
+
+## Low level
+
+```python
+from kafka.client import KafkaClient
+kafka = KafkaClient("localhost", 9092)
+req = ProduceRequest(topic="my-topic", partition=1,
+ messages=[KafkaProdocol.encode_message("some message")])
+resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
+kafka.close()
+
+resps[0].topic # "my-topic"
+resps[0].partition # 1
+resps[0].error # 0 (hopefully)
+resps[0].offset # offset of the first message sent in this request
+```
# Install
@@ -60,11 +101,14 @@ pip install python-snappy
# Tests
-Some of the tests will fail if Snappy is not installed. These tests will throw NotImplementedError. If you see other failures,
-they might be bugs - so please report them!
+Some of the tests will fail if Snappy is not installed. These tests will throw
+NotImplementedError. If you see other failures, they might be bugs - so please
+report them!
## Run the unit tests
+_These are broken at the moment_
+
```shell
python -m test.unit
```
@@ -81,46 +125,11 @@ cd kafka-src
./sbt package
```
-Then from the root directory, run the integration tests
+Next start up a ZooKeeper server on localhost:2181
```shell
-python -m test.integration
-```
-
-# Usage
-
-## High level
-
-```python
-from kafka.client import KafkaClient
-from kafka.consumer import SimpleConsumer
-from kafka.producer import SimpleProducer
-
-kafka = KafkaClient("localhost", 9092)
-
-producer = SimpleProducer(kafka, "my-topic")
-producer.send_messages("some message")
-producer.send_messages("this method", "is variadic")
-
-consumer = SimpleConsumer(kafka, "my-group", "my-topic")
-for message in consumer:
- print(message)
-
-kafka.close()
+/opt/zookeeper/bin/zkServer.sh start
```
-## Low level
-
-```python
-from kafka.client import KafkaClient
-kafka = KafkaClient("localhost", 9092)
-req = ProduceRequest(topic="my-topic", partition=1,
- messages=[KafkaProdocol.encode_message("some message")])
-resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
-kafka.close()
-
-resps[0].topic # "my-topic"
-resps[0].partition # 1
-resps[0].error # 0 (hopefully)
-resps[0].offset # offset of the first message sent in this request
-```
+This will actually start up real Kafka brokers and send messages in using the
+client.
diff --git a/kafka/client.py b/kafka/client.py
index 1c7fc93..5a58b2e 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -233,7 +233,7 @@ class KafkaClient(object):
def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None):
raise NotImplementedError("Broker-managed offsets not supported in 0.8")
resps = self._send_broker_aware_request(payloads,
- partial(KafkaProtocol.encode_offset_commit_fetch, group=group),
+ partial(KafkaProtocol.encode_offset_fetch_request, group=group),
KafkaProtocol.decode_offset_fetch_response)
out = []
for resp in resps:
diff --git a/kafka/protocol.py b/kafka/protocol.py
index fc866ad..94a7f2a 100644
--- a/kafka/protocol.py
+++ b/kafka/protocol.py
@@ -354,7 +354,6 @@ class KafkaProtocol(object):
======
data: bytes to decode
"""
- data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
@@ -398,7 +397,6 @@ class KafkaProtocol(object):
data: bytes to decode
"""
- data = data[2:] # TODO remove me when versionId is removed
((correlation_id,), cur) = relative_unpack('>i', data, 0)
(client_id, cur) = read_short_string(data, cur)
((num_topics,), cur) = relative_unpack('>i', data, cur)
diff --git a/setup.py b/setup.py
index 18f9fcc..bf61b56 100644
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@ from distutils.core import setup
setup(
name="kafka-python",
- version="0.2-alpha",
+ version="0.8.0-1",
author="David Arthur",
author_email="mumrah@gmail.com",
url="https://github.com/mumrah/kafka-python",
diff --git a/test/integration.py b/test/integration.py
index a1fcce7..de91130 100644
--- a/test/integration.py
+++ b/test/integration.py
@@ -6,6 +6,7 @@ import shlex
import shutil
import socket
import subprocess
+import sys
import tempfile
from threading import Thread, Event
import time
@@ -73,7 +74,8 @@ class KafkaFixture(Thread):
args = shlex.split("java -cp %s org.apache.zookeeper.ZooKeeperMain create /%s kafka-python" % (cp, self.zk_chroot))
proc = subprocess.Popen(args)
ret = proc.wait()
- assert ret == 0
+ if ret != 0:
+ sys.exit(1)
# Start Kafka