From eac51e9c68c50f15962b6c785ede92cb3d512a17 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 1 Mar 2013 15:12:21 -0500 Subject: Integration tests passing --- kafka/client.py | 1239 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 741 insertions(+), 498 deletions(-) (limited to 'kafka/client.py') diff --git a/kafka/client.py b/kafka/client.py index c25e2d2..5da3919 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,631 +1,874 @@ -from collections import namedtuple +import base64 +from collections import namedtuple, defaultdict +from functools import partial +from itertools import groupby, count import logging +from operator import attrgetter import socket import struct +import time import zlib from .codec import gzip_encode, gzip_decode from .codec import snappy_encode, snappy_decode +from .util import read_short_string, read_int_string +from .util import relative_unpack +from .util import write_short_string, write_int_string +from .util import group_by_topic_and_partition +from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") -error_codes = { - -1: "UnknownError", - 0: None, - 1: "OffsetOutOfRange", - 2: "InvalidMessage", - 3: "WrongPartition", - 4: "InvalidFetchSize" -} - -class KafkaException(Exception): - def __init__(self, errorType): - self.errorType = errorType - def __str__(self): - return str(self.errorType) +############### +# Structs # +############### -Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) -FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) +# Request payloads ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) - -def length_prefix_message(msg): - """ - Prefix a message with it's length as an int - """ - return struct.pack('>i', len(msg)) + msg - -class KafkaClient(object): +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) +OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) + +# Response payloads +ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) +FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) +OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) +OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) +BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) +PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"]) + +# Other useful structs +OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) +Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) + +class ErrorMapping(object): + # Many of these are not actually used by the client + UNKNOWN = -1 + NO_ERROR = 0 + OFFSET_OUT_OF_RANGE = 1 + INVALID_MESSAGE = 2 + UNKNOWN_TOPIC_OR_PARTITON = 3 + INVALID_FETCH_SIZE = 4 + LEADER_NOT_AVAILABLE = 5 + NOT_LEADER_FOR_PARTITION = 6 + REQUEST_TIMED_OUT = 7 + BROKER_NOT_AVAILABLE = 8 + REPLICA_NOT_AVAILABLE = 9 + MESSAGE_SIZE_TO_LARGE = 10 + STALE_CONTROLLER_EPOCH = 11 + OFFSET_METADATA_TOO_LARGE = 12 + +class KafkaProtocol(object): """ - Request Structure - ================= - - ::= - ::= - ::= 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Response Structure - ================== - - ::= - ::= - ::= -1 | 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Messages are big-endian byte order + Class to encapsulate all of the protocol encoding/decoding. This class does not + have any state associated with it, it is purely for organization. """ - - PRODUCE_KEY = 0 - FETCH_KEY = 1 - MULTIFETCH_KEY = 2 - MULTIPRODUCE_KEY = 3 - OFFSET_KEY = 4 + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 6 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 - def __init__(self, host, port, bufsize=1024): - self.host = host - self.port = port - self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) - log.debug("Connected to %s on %d", host, port) - - def __copy__(self): - return KafkaClient(self.host, self.port, self.bufsize) + ################### + # Private API # + ################### - ###################### - # Protocol Stuff # - ###################### - - def _consume_response_iter(self): + @classmethod + def _encode_message_header(cls, client_id, correlation_id, request_key): """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response + Encode the common request envelope """ - log.debug("Handling response from Kafka") - # Header - resp = self._sock.recv(6) - if resp == "": - raise Exception("Got no response from Kafka") - (size, err) = struct.unpack('>iH', resp) - - log.debug("About to read %d bytes from Kafka", size-2) - # Handle error - error = error_codes.get(err) - if error is not None: - raise KafkaException(error) - - # Response iterator - total = 0 - while total < (size-2): - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise Exception("Underflow") - total += len(resp) - yield resp + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + 0, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # + client_id) # ClientId - def _consume_response(self): + @classmethod + def _encode_message_set(cls, messages): """ - Fully consumer the response iterator + Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are + not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data + message_set = "" + for message in messages: + encoded_message = KafkaProtocol._encode_message(message) + message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) + return message_set @classmethod - def encode_message(cls, message): + def _encode_message(cls, message): """ - Encode a Message from a Message tuple + Encode a single message. - Params + The magic number of a message is a format version number. The only supported + magic number right now is zero + + Format ====== - message: Message - - Wire Format - =========== - ::= | - ::= 0 - ::= 1 - ::= - ::= - ::= - ::= - ::= - ::= - - The crc is a crc32 checksum of the message payload. The attributes are bitmask - used for indicating the compression algorithm. + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes """ if message.magic == 0: - msg = struct.pack('>Bi%ds' % len(message.payload), - message.magic, message.crc, message.payload) - elif message.magic == 1: - msg = struct.pack('>BBi%ds' % len(message.payload), - message.magic, message.attributes, message.crc, message.payload) + msg = struct.pack('>BB', message.magic, message.attributes) + msg += write_int_string(message.key) + msg += write_int_string(message.value) + crc = zlib.crc32(msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) else: raise Exception("Unexpected magic number: %d" % message.magic) - msg = length_prefix_message(msg) - log.debug("Encoded %s as %r" % (message, msg)) return msg + @classmethod - def encode_message_set(cls, messages): + def _decode_message_set_iter(cls, data): """ - Encode a MessageSet + Iteratively decode a MessageSet - One or more concatenated Messages + Reads repeated elements of (offset, message), calling decode_message to decode a + single message. Since compressed messages contain futher MessageSets, these two methods + have been decoupled so that they may recurse easily. """ - message_set = "" - for message in messages: - encoded_message = cls.encode_message(message) - message_set += encoded_message - return message_set + cur = 0 + while cur < len(data): + try: + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + yield OffsetAndMessage(offset, message) + except BufferUnderflowError: # If we get a partial read of a message, stop + raise StopIteration() @classmethod - def encode_produce_request(cls, produceRequest): - """ - Encode a ProduceRequest - - Wire Format - =========== - ::= - ::= 0 - ::= - ::= - ::= - ::= - - The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet - """ - (topic, partition, messages) = produceRequest - message_set = cls.encode_message_set(messages) - log.debug("Sending MessageSet: %r" % message_set) - req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), - KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) - return req + def _decode_message(cls, data, offset): + """ + Decode a single Message + + The only caller of this method is decode_message_set_iter. They are decoupled to + support nested messages (compressed MessageSets). The offset is actually read from + decode_message_set_iter (it is part of the MessageSet payload). + """ + ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) + if crc != zlib.crc32(data[4:]): + raise ChecksumError("Message checksum failed") + + (key, cur) = read_int_string(data, cur) + (value, cur) = read_int_string(data, cur) + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE: + yield (offset, Message(magic, att, key, value)) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP: + gz = gzip_decode(value) + for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): + yield (offset, message) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY: + snp = snappy_decode(value) + for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): + yield (offset, message) + + ################## + # Public API # + ################## @classmethod - def encode_multi_produce_request(cls, produceRequests): + def create_message(cls, payload, key=None): """ - Encode a MultiProducerRequest + Construct a Message Params ====== - produceRequest: list of ProduceRequest objects - - Returns - ======= - Encoded request - - Wire Format - =========== - ::= - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - - num is the number of ProduceRequests being encoded - """ - req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) - for (topic, partition, messages) in produceRequests: - message_set = cls.encode_message_set(messages) - req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), - len(topic), topic, partition, len(message_set), message_set) - return req - - @classmethod - def encode_fetch_request(cls, fetchRequest): - """ - Encode a FetchRequest message - - Wire Format - =========== - ::= - ::= 1 - ::= - ::= - ::= - ::= - ::= - - The request-key (1) is encoded as a short (int16). - """ - (topic, partition, offset, size) = fetchRequest - req = struct.pack('>HH%dsiqi' % len(topic), - KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) - return req + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ + return Message(0, 0, key, payload) @classmethod - def encode_multi_fetch_request(cls, fetchRequests): + def create_gzip_message(cls, payloads, key=None): """ - Encode the MultiFetchRequest message from a list of FetchRequest objects + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. Params ====== - fetchRequests: list of FetchRequest - - Returns - ======= - req: bytes, The message to send to Kafka - - Wire Format - =========== - ::= [ ] - ::= 2 - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - ::= - - The request-key (2) is encoded as a short (int16). - """ - req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) - for (topic, partition, offset, size) in fetchRequests: - req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) - return req + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + gzipped = gzip_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) @classmethod - def encode_offset_request(cls, offsetRequest): + def create_snappy_message(cls, payloads, key=None): """ - Encode an OffsetRequest message + Construct a Snappy Message containing multiple Messages - Wire Format - =========== - ::=