diff options
author | David Arthur <mumrah@gmail.com> | 2012-09-27 10:36:37 -0400 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-09-27 10:36:37 -0400 |
commit | 7486d8704f21acb454c23764162d22a59cdfa3e8 (patch) | |
tree | 786ac7afd4828f584011604eb4498ed1e40871d1 /kafka.py | |
parent | ab273c459fba1d47163b1ac5f1f7a3f0fe6f0215 (diff) | |
download | kafka-python-7486d8704f21acb454c23764162d22a59cdfa3e8.tar.gz |
Starting on unit tests
Updated several methods in KafkaClient to be classmethods. Updated some
inline documentation also
Diffstat (limited to 'kafka.py')
-rw-r--r-- | kafka.py | 148 |
1 files changed, 110 insertions, 38 deletions
@@ -31,9 +31,9 @@ FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) -def gzip_compress(payload): +def gzip_encode(payload): buf = StringIO() - f = gzip.GzipFile(fileobj=buf, mode='w') + f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) f.write(payload) f.close() buf.seek(0) @@ -41,7 +41,7 @@ def gzip_compress(payload): buf.close() return out -def gzip_decompress(payload): +def gzip_decode(payload): buf = StringIO(payload) f = gzip.GzipFile(fileobj=buf, mode='r') out = f.read() @@ -49,6 +49,7 @@ def gzip_decompress(payload): buf.close() return out + def length_prefix_message(msg): """ Prefix a message with it's length as an int @@ -84,9 +85,10 @@ class KafkaClient(object): ATTRIBUTE_CODEC_MASK = 0x03 - def __init__(self, host, port): + def __init__(self, host, port, bufsize=1024): self.host = host self.port = port + self.bufsize = bufsize self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.connect((host, port)) self._sock.settimeout(10) @@ -117,7 +119,7 @@ class KafkaClient(object): # Response iterator total = 0 while total < (size-2): - resp = self._sock.recv(1024) + resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": raise Exception("Underflow") @@ -133,7 +135,8 @@ class KafkaClient(object): data += chunk return data - def encode_message(self, message): + @classmethod + def encode_message(cls, message): """ Encode a Message from a Message tuple @@ -163,20 +166,26 @@ class KafkaClient(object): msg = struct.pack('>BBi%ds' % len(message.payload), message.magic, message.attributes, message.crc, message.payload) else: - raise Exception("Unknown message version: %d" % message.magic) + raise Exception("Unexpected magic number: %d" % message.magic) msg = length_prefix_message(msg) log.debug("Encoded %s as %r" % (message, msg)) return msg - def encode_message_set(self, messages): - # TODO document + @classmethod + def encode_message_set(cls, messages): + """ + Encode a MessageSet + + One or more concatenated Messages + """ message_set = "" for message in messages: - encoded_message = self.encode_message(message) + encoded_message = cls.encode_message(message) message_set += encoded_message return message_set - def encode_produce_request(self, produceRequest): + @classmethod + def encode_produce_request(cls, produceRequest): """ Encode a ProduceRequest @@ -198,16 +207,41 @@ class KafkaClient(object): KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) return req - def encode_multi_produce_request(self, produceRequests): - # TODO document + @classmethod + def encode_multi_produce_request(cls, produceRequests): + """ + Encode a MultiProducerRequest + + Params + ====== + produceRequest: list of ProduceRequest objects + + Returns + ======= + Encoded request + + Wire Format + =========== + <MultiProducerReqeust> ::= <request-key> <num> <ProduceRequests> + <num> ::= <int16> + <ProduceRequests> ::= <ProduceRequest> [ <ProduceRequests> ] + <ProduceRequest> ::= <topic> <partition> <len> <MessageSet> + <topic> ::= <topic-length><string> + <topic-length> ::= <int16> + <partition> ::= <int32> + <len> ::= <int32> + + num is the number of ProduceRequests being encoded + """ req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) for (topic, partition, messages) in produceRequests: - message_set = self.encode_message_set(messages) + message_set = cls.encode_message_set(messages) req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), len(topic), topic, partition, len(message_set), message_set) return req - def encode_fetch_request(self, fetchRequest): + @classmethod + def encode_fetch_request(cls, fetchRequest): """ Encode a FetchRequest message @@ -228,7 +262,8 @@ class KafkaClient(object): KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) return req - def encode_multi_fetch_request(self, fetchRequests): + @classmethod + def encode_multi_fetch_request(cls, fetchRequests): """ Encode the MultiFetchRequest message from a list of FetchRequest objects @@ -260,7 +295,8 @@ class KafkaClient(object): req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) return req - def encode_offset_request(self, offsetRequest): + @classmethod + def encode_offset_request(cls, offsetRequest): """ Encode an OffsetRequest message @@ -281,43 +317,57 @@ class KafkaClient(object): req = struct.pack('>HH%dsiqi' % len(topic), KafkaClient.OFFSET_KEY, len(topic), topic, partition, offset, maxOffsets) return req - def decode_message(self, data): + @classmethod + def decode_message(cls, data): """ Decode a Message - Since a Message can actually contained a compressed payload of multiple nested Messages, - this method returns a generator. + Verify crc and decode the Message. A compressed Message's payload is actually + an encoded MessageSet. This allows Messages to be nested within Messages and + as such, this method will recurse. + + Params + ====== + data, bytes + + Returns + ======= + Generator of Messages (depth-first) """ - # TODO document N = len(data) (magic,) = struct.unpack('>B', data[0:1]) - if magic == 0: # v0 Message - # Read crc; check the crc; append the message + if magic == 0: + # version 0 (crc,) = struct.unpack('>i', data[1:5]) payload = data[5:N] assert zlib.crc32(payload) == crc msg = Message(magic, None, crc, payload) log.debug("Got v0 Message, %s", msg) yield msg - elif magic == 1: # v1 Message - # Read attributes, crc; check the crc; append the message + elif magic == 1: + # version 1 (att, crc) = struct.unpack('>Bi', data[1:6]) payload = data[6:N] assert zlib.crc32(payload) == crc - # Uncompressed, just a single Message if att & KafkaClient.ATTRIBUTE_CODEC_MASK == 0: + # Uncompressed, just a single Message msg = Message(magic, att, crc, payload) log.debug("Got v1 Message, %s", msg) yield msg elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 1: - gz = gzip_decompress(payload) - (msgs, _) = self.read_message_set(gz) + # Gzip encoded Message + gz = gzip_decode(payload) + (msgs, _) = cls.read_message_set(gz) for msg in msgs: yield msg + elif att & KafkaClient.ATTRIBUTE_CODEC_MASK == 2: + # Snappy encoded Message + raise NotImplementedError("Snappy codec is not yet supported") else: raise RuntimeError("Unsupported compression type: %d" % (att & KafkaClient.ATTRIBUTE_CODEC_MASK)) - def read_message_set(self, data): + @classmethod + def read_message_set(cls, data): """ Read a MessageSet @@ -363,7 +413,7 @@ class KafkaClient(object): cur += 4 # Decode the message(s) - for m in self.decode_message(data[cur:cur+N]): + for m in cls.decode_message(data[cur:cur+N]): msgs.append(m) # Advance the cursor @@ -376,15 +426,37 @@ class KafkaClient(object): # Advanced User API # ######################### - def create_message_from_string(self, payload): - #TODO document + @classmethod + def create_message(cls, payload): + """ + Create a standard Message + + Params + ====== + payload, bytes + + Returns + ======= + A Message tuple + """ return Message(1, 0, zlib.crc32(payload), payload) - def create_gzipped_message(self, *payloads): - #TODO document - messages = [self.create_message_from_string(payload) for payload in payloads] - message_set = self.encode_message_set(messages) - gzipped = gzip_compress(message_set) + @classmethod + def create_gzip_message(cls, *payloads): + """ + Create a Gzip encoded Message + + Params + ====== + payloads, list of messages (bytes) to be encoded + + Returns + ======= + A Message tuple + """ + messages = [cls.create_message(payload) for payload in payloads] + message_set = cls.encode_message_set(messages) + gzipped = gzip_encode(message_set) return Message(1, 0x00 | (KafkaClient.ATTRIBUTE_CODEC_MASK & 0x01), zlib.crc32(gzipped), gzipped) def send_message_set(self, produceRequest): @@ -522,7 +594,7 @@ class KafkaClient(object): partition: int payloads: strings """ - messages = tuple([create_message_from_string(payload) for payload in payloads]) + messages = tuple([create_message(payload) for payload in payloads]) self.send_message_set(ProduceRequest(topic, partition, messages)) def iter_messages(self, topic, partition, offset, size, auto=True): |