From c448bf367036d3a3c04ee553987da6be64820d00 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Oct 2012 12:20:57 -0400 Subject: Renaming kafka.py to client.py --- README.md | 5 + example.py | 3 +- kafka/client.py | 610 ++++++++++++++++++++++++++++++++++++++++++++++++++++ kafka/kafka.py | 610 ---------------------------------------------------- setup.py | 2 +- test/integration.py | 2 +- test/unit.py | 2 +- 7 files changed, 619 insertions(+), 615 deletions(-) create mode 100644 kafka/client.py delete mode 100644 kafka/kafka.py diff --git a/README.md b/README.md index ce257f1..540fc9f 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,10 @@ Copyright 2012, David Arthur under Apache License, v2.0. See `LICENSE` This project is very much alpha. The API is in flux and not all the features are fully implemented. +# Install + +Install with your favorite package manager + # Tests ## Run the unit tests @@ -43,6 +47,7 @@ python -m test.integration ## Send a message to a topic ```python + from kafka.client import KafkaClient kafka = KafkaClient("localhost", 9092) kafka.send_messages_simple("my-topic", "some message") kafka.close() diff --git a/example.py b/example.py index 286bfdb..ced32dd 100644 --- a/example.py +++ b/example.py @@ -1,6 +1,6 @@ import logging -from kafka import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient, FetchRequest, ProduceRequest def produce_example(kafka): message = kafka.create_message("testing") @@ -26,7 +26,6 @@ def main(): consume_example(kafka) kafka.close() - if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) main() diff --git a/kafka/client.py b/kafka/client.py new file mode 100644 index 0000000..0cde87f --- /dev/null +++ b/kafka/client.py @@ -0,0 +1,610 @@ +from collections import namedtuple +from cStringIO import StringIO +import logging +import gzip +import select +import socket +import struct +import zlib + +from .codec import gzip_encode, gzip_decode + +log = logging.getLogger("kafka") + +error_codes = { + -1: "UnknownError", + 0: None, + 1: "OffsetOutOfRange", + 2: "InvalidMessage", + 3: "WrongPartition", + 4: "InvalidFetchSize" +} + +class KafkaException(Exception): + def __init__(self, errorType): + self.errorType = errorType + def __str__(self): + return str(errorType) + +Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) +ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) + +def length_prefix_message(msg): + """ + Prefix a message with it's length as an int + """ + return struct.pack('>i', len(msg)) + msg + +class KafkaClient(object): + """ + Request Structure + ================= + + ::= + ::= + ::= 0 | 1 | 2 | 3 | 4 + ::= | | | | + + Response Structure + ================== + + ::= + ::= + ::= -1 | 0 | 1 | 2 | 3 | 4 + ::= | | | | + + Messages are big-endian byte order + """ + + PRODUCE_KEY = 0 + FETCH_KEY = 1 + MULTIFETCH_KEY = 2 + MULTIPRODUCE_KEY = 3 + OFFSET_KEY = 4 + + ATTRIBUTE_CODEC_MASK = 0x03 + + def __init__(self, host, port, bufsize=1024): + self.host = host + self.port = port + self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) + self._sock.settimeout(10) + log.debug("Connected to %s on %d", host, port) + + ###################### + # Protocol Stuff # + ###################### + + def _consume_response_iter(self): + """ + This method handles the response header and error messages. It + then returns an iterator for the chunks of the response + """ + log.debug("Handling response from Kafka") + # Header + resp = self._sock.recv(6) + if resp == "": + raise Exception("Got no response from Kafka") + (size, err) = struct.unpack('>iH', resp) + + log.debug("About to read %d bytes from Kafka", size-2) + # Handle error + error = error_codes.get(err) + if error is not None: + raise KafkaException(error) + + # Response iterator + total = 0 + while total < (size-2): + resp = self._sock.recv(self.bufsize) + log.debug("Read %d bytes from Kafka", len(resp)) + if resp == "": + raise Exception("Underflow") + total += len(resp) + yield resp + + def _consume_response(self): + """ + Fully consumer the response iterator + """ + data = "" + for chunk in self._consume_response_iter(): + data += chunk + return data + + @classmethod + def encode_message(cls, message): + """ + Encode a Message from a Message tuple + + Params + ====== + message: Message + + Wire Format + =========== + ::= | + ::= 0 + ::= 1 + ::= + ::= + ::= + ::= + ::= + ::= + + The crc is a crc32 checksum of the message payload. The attributes are bitmask + used for indicating the compression algorithm. + """ + if message.magic == 0: + msg = struct.pack('>Bi%ds' % len(message.payload), + message.magic, message.crc, message.payload) + elif message.magic == 1: + msg = struct.pack('>BBi%ds' % len(message.payload), + message.magic, message.attributes, message.crc, message.payload) + else: + raise Exception("Unexpected magic number: %d" % message.magic) + msg = length_prefix_message(msg) + log.debug("Encoded %s as %r" % (message, msg)) + return msg + + @classmethod + def encode_message_set(cls, messages): + """ + Encode a MessageSet + + One or more concatenated Messages + """ + message_set = "" + for message in messages: + encoded_message = cls.encode_message(message) + message_set += encoded_message + return message_set + + @classmethod + def encode_produce_request(cls, produceRequest): + """ + Encode a ProduceRequest + + Wire Format + =========== + ::= + ::= 0 + ::= + ::= + ::= + ::= + + The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet + """ + (topic, partition, messages) = produceRequest + message_set = cls.encode_message_set(messages) + log.debug("Sending MessageSet: %r" % message_set) + req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), + KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) + return req + + @classmethod + def encode_multi_produce_request(cls, produceRequests): + """ + Encode a MultiProducerRequest + + Params + ====== + produceRequest: list of ProduceRequest objects + + Returns + ======= + Encoded request + + Wire Format + =========== + ::= + ::= + ::= [ ] + ::= + ::= + ::= + ::= + ::= + + num is the number of ProduceRequests being encoded + """ + req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) + for (topic, partition, messages) in produceRequests: + message_set = cls.encode_message_set(messages) + req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), + len(topic), topic, partition, len(message_set), message_set) + return req + + @classmethod + def encode_fetch_request(cls, fetchRequest): + """ + Encode a FetchRequest message + + Wire Format + =========== + ::= + ::= 1 + ::= + ::= + ::= + ::= + ::= + + The request-key (1) is encoded as a short (int16). + """ + (topic, partition, offset, size) = fetchRequest + req = struct.pack('>HH%dsiqi' % len(topic), + KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) + return req + + @classmethod + def encode_multi_fetch_request(cls, fetchRequests): + """ + Encode the MultiFetchRequest message from a list of FetchRequest objects + + Params + ====== + fetchRequests: list of FetchRequest + + Returns + ======= + req: bytes, The message to send to Kafka + + Wire Format + =========== + ::= [ ] + ::= 2 + ::= + ::= [ ] + ::= + ::= + ::= + ::= + ::= + ::= + + The request-key (2) is encoded as a short (int16). + """ + req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) + for (topic, partition, offset, size) in fetchRequests: + req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) + return req + + @classmethod + def encode_offset_request(cls, offsetRequest): + """ + Encode an OffsetRequest message + + Wire Format + =========== + ::=