summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-09-27 10:36:37 -0400
committerDavid Arthur <mumrah@gmail.com>2012-09-27 10:36:37 -0400
commit7486d8704f21acb454c23764162d22a59cdfa3e8 (patch)
tree786ac7afd4828f584011604eb4498ed1e40871d1
parentab273c459fba1d47163b1ac5f1f7a3f0fe6f0215 (diff)
downloadkafka-python-7486d8704f21acb454c23764162d22a59cdfa3e8.tar.gz
Starting on unit tests
Updated several methods in KafkaClient to be classmethods. Updated some inline documentation also
-rw-r--r--example.py4
-rw-r--r--kafka.py148
-rw-r--r--test.py44
3 files changed, 156 insertions, 40 deletions
diff --git a/example.py b/example.py
index 0204af1..6ec223c 100644
--- a/example.py
+++ b/example.py
@@ -3,7 +3,7 @@ import logging
from kafka import KafkaClient, FetchRequest, ProduceRequest
def produce_example(kafka):
- message = kafka.create_message_from_string("testing")
+ message = kafka.create_message("testing")
request = ProduceRequest("my-topic", 0, [message])
kafka.send_message_set(request)
@@ -15,7 +15,7 @@ def consume_example(kafka):
print(nextRequest)
def produce_gz_example(kafka):
- message = kafka.create_gzipped_message("this message was gzipped", "along with this one")
+ message = kafka.create_gzip_message("this message was gzipped", "along with this one")
request = ProduceRequest("my-topic", 0, [message])
kafka.send_message_set(request)
diff --git a/kafka.py b/kafka.py
index fbd415d..902a71e 100644
--- a/kafka.py
+++ b/kafka.py
@@ -31,9 +31,9 @@ FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size
ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"])
OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"])
-def gzip_compress(payload):
+def gzip_encode(payload):
buf = StringIO()
- f = gzip.GzipFile(fileobj=buf, mode='w')
+ f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6)
f.write(payload)
f.close()
buf.seek(0)
@@ -41,7 +41,7 @@ def gzip_compress(payload):
buf.close()
return out
-def gzip_decompress(payload):
+def gzip_decode(payload):
buf = StringIO(payload)
f = gzip.GzipFile(fileobj=buf, mode='r')
out = f.read()
@@ -49,6 +49,7 @@ def gzip_decompress(payload):
buf.close()
return out
+
def length_prefix_message(msg):
"""
Prefix a message with it's length as an int
@@ -84,9 +85,10 @@ class KafkaClient(object):
ATTRIBUTE_CODEC_MASK = 0x03
- def __init__(self, host, port):
+ def __init__(self, host, port, bufsize=1024):
self.host = host
self.port = port
+ self.bufsize = bufsize
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect((host, port))
self._sock.settimeout(10)
@@ -117,7 +119,7 @@ class KafkaClient(object):
# Response iterator
total = 0
while total < (size-2):
- resp = self._sock.recv(1024)
+ resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise Exception("Underflow")
@@ -133,7 +135,8 @@ class KafkaClient(object):
data += chunk
return data
- def encode_message(self, message):
+ @classmethod
+ def encode_message(cls, message):
"""
Encode a Message from a Message tuple
@@ -163,20 +166,26 @@ class KafkaClient(object):
msg = struct.pack('>BBi%ds' % len(message.payload),
message.magic, message.attributes, message.crc, message.payload)
else:
- raise Exception("Unknown message version: %d" % message.magic)
+ raise Exception("Unexpected magic number: %d" % message.magic)
msg = length_prefix_message(msg)
log.debug("Encoded %s as %r" % (message, msg))
return msg
- def encode_message_set(self, messages):
- # TODO document
+ @classmethod
+ def encode_message_set(cls, messages):
+ """
+ Encode a MessageSet
+
+ One or more concatenated Messages
+ """
message_set = ""
for message in messages:
- encoded_message = self.encode_message(message)
+ encoded_message = cls.encode_message(message)
message_set += encoded_message
return message_set
- def encode_produce_request(self, produceRequest):
+ @classmethod
+ def encode_produce_request(cls, produceRequest):
"""
Encode a ProduceRequest
@@ -198,16 +207,41 @@ class KafkaClient(object):
KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set)
return req
- def encode_multi_produce_request(self, produceRequests):
- # TODO document
+ @classmethod
+ def encode_multi_produce_request(cls, produceRequests):
+ """
+ Encode a MultiProducerRequest
+
+ Params
+ ======
+ produceRequest: list of ProduceRequest objects
+
+ Returns
+ =======
+ Encoded request
+
+ Wire Format
+ ===========
+ <MultiProducerReqeust> ::= <request-key> <num> <ProduceRequests>
+ <num> ::= <int16>
+ <ProduceRequests> ::= <ProduceRequest> [ <ProduceRequests> ]
+ <ProduceRequest> ::= <topic> <partition> <len> <MessageSet>
+ <topic> ::= <topic-length><string>
+ <topic-length> ::= <int16>
+ <partition> ::= <int32>
+ <len> ::= <int32>
+
+ num is the number of ProduceRequests being encoded
+ """
req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests))
for (topic, partition, messages) in produceRequests:
- message_set = self.encode_message_set(messages)
+ message_set = cls.encode_message_set(messages)
req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)),
len(topic), topic, partition, len(message_set), message_set)
return req
- def encode_fetch_request(self, fetchRequest):
+ @classmethod
+ def encode_fetch_request(cls, fetchRequest):
"""
Encode a FetchRequest message
@@ -228,7 +262,8 @@ class KafkaClient(object):
KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size)
return req
- def encode_multi_fetch_request(self, fetchRequests):
+ @classmethod
+ def encode_multi_fetch_request(cls, fetchRequests):
"""
Encode the MultiFetchRequest message from a list of FetchRequest objects
@@ -260,7 +295,8 @@ class KafkaClient(object):
req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size)
return req
- def encode_offset_request(self, offsetRequest):
+ @classmethod
+ def encode_offset_request(cls, offsetRequest):
"""
Encode an OffsetRequest message
@@ -281,43 +317,57 @@ class KafkaClient(object):
req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.OFFSET_KEY, len(topic), topic, partition, offset, maxOffsets)
return req
- def decode_message(self, data):
+ @classmethod
+ def decode_message(cls, data):
"""
Decode a Message
- Since a Message can actually contained a compressed payload of multiple nested Messages,
- this method returns a generator.
+ Verify crc and decode the Message. A compressed Message's payload is actually
+ an encoded MessageSet. This allows Messages to be nested within Messages and
+ as such, this method will recurse.
+
+ Params
+ ======
+ data, bytes
+
+ Returns
+ =======
+ Generator of Messages (depth-first)
"""
- # TODO document
N = len(data)
(magic,) = struct.unpack('>B', data[0:1])
- if magic == 0: # v0 Message
- # Read crc; check the crc; append the message
+ if magic == 0:
+ # version 0
(crc,) = struct.unpack('>i', data[1:5])
payload = data[5:N]
assert zlib.crc32(payload) == crc
msg = Message(magic, None, crc, payload)
log.debug("Got v0 Message, %s", msg)
yield msg
- elif magic == 1: # v1 Message
- # Read attributes, crc; check the crc; append the message
+ elif magic == 1:
+ # version 1
(att, crc) = struct.unpack('>Bi', data[1:6])
payload = data[6:N]
assert zlib.crc32(payload) == crc
- # Uncompressed, just a single Message
if att & KafkaClient.ATTRIBUTE_CODEC_MASK == 0:
+ # Uncompressed, just a single Message
msg = Message(magic, att, crc, payload)
log.debug("Got v1 Message, %s", msg)
yield msg
elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 1:
- gz = gzip_decompress(payload)
- (msgs, _) = self.read_message_set(gz)
+ # Gzip encoded Message
+ gz = gzip_decode(payload)
+ (msgs, _) = cls.read_message_set(gz)
for msg in msgs:
yield msg
+ elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2:
+ # Snappy encoded Message
+ raise NotImplementedError("Snappy codec is not yet supported")
else:
raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK))
- def read_message_set(self, data):
+ @classmethod
+ def read_message_set(cls, data):
"""
Read a MessageSet
@@ -363,7 +413,7 @@ class KafkaClient(object):
cur += 4
# Decode the message(s)
- for m in self.decode_message(data[cur:cur+N]):
+ for m in cls.decode_message(data[cur:cur+N]):
msgs.append(m)
# Advance the cursor
@@ -376,15 +426,37 @@ class KafkaClient(object):
# Advanced User API #
#########################
- def create_message_from_string(self, payload):
- #TODO document
+ @classmethod
+ def create_message(cls, payload):
+ """
+ Create a standard Message
+
+ Params
+ ======
+ payload, bytes
+
+ Returns
+ =======
+ A Message tuple
+ """
return Message(1, 0, zlib.crc32(payload), payload)
- def create_gzipped_message(self, *payloads):
- #TODO document
- messages = [self.create_message_from_string(payload) for payload in payloads]
- message_set = self.encode_message_set(messages)
- gzipped = gzip_compress(message_set)
+ @classmethod
+ def create_gzip_message(cls, *payloads):
+ """
+ Create a Gzip encoded Message
+
+ Params
+ ======
+ payloads, list of messages (bytes) to be encoded
+
+ Returns
+ =======
+ A Message tuple
+ """
+ messages = [cls.create_message(payload) for payload in payloads]
+ message_set = cls.encode_message_set(messages)
+ gzipped = gzip_encode(message_set)
return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped)
def send_message_set(self, produceRequest):
@@ -522,7 +594,7 @@ class KafkaClient(object):
partition: int
payloads: strings
"""
- messages = tuple([create_message_from_string(payload) for payload in payloads])
+ messages = tuple([create_message(payload) for payload in payloads])
self.send_message_set(ProduceRequest(topic, partition, messages))
def iter_messages(self, topic, partition, offset, size, auto=True):
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..3f5908e
--- /dev/null
+++ b/test.py
@@ -0,0 +1,44 @@
+import binascii
+import unittest
+
+from kafka import KafkaClient
+
+class TestMessage(unittest.TestCase):
+ def test_message_simple(self):
+ msg = KafkaClient.create_message("testing")
+ enc = KafkaClient.encode_message(msg)
+ expect = "\x00\x00\x00\r\x01\x00\xe8\xf3Z\x06testing"
+ self.assertEquals(enc, expect)
+ (messages, read) = KafkaClient.read_message_set(enc)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0], msg)
+
+ def test_message_list(self):
+ msgs = [
+ KafkaClient.create_message("one"),
+ KafkaClient.create_message("two"),
+ KafkaClient.create_message("three")
+ ]
+ enc = KafkaClient.encode_message_set(msgs)
+ expect = ("\x00\x00\x00\t\x01\x00zl\x86\xf1one\x00\x00\x00\t\x01\x00\x11"
+ "\xca\x8aftwo\x00\x00\x00\x0b\x01\x00F\xc5\xd8\xf5three")
+ self.assertEquals(enc, expect)
+ (messages, read) = KafkaClient.read_message_set(enc)
+ self.assertEquals(len(messages), 3)
+ self.assertEquals(messages[0].payload, "one")
+ self.assertEquals(messages[1].payload, "two")
+ self.assertEquals(messages[2].payload, "three")
+
+
+ def test_message_gzip(self):
+ msg = KafkaClient.create_gzip_message("one", "two", "three")
+ enc = KafkaClient.encode_message(msg)
+ # Can't check the bytes directly since Gzip is non-deterministic
+ (messages, read) = KafkaClient.read_message_set(enc)
+ self.assertEquals(len(messages), 3)
+ self.assertEquals(messages[0].payload, "one")
+ self.assertEquals(messages[1].payload, "two")
+ self.assertEquals(messages[2].payload, "three")
+
+if __name__ == '__main__':
+ unittest.main()