From 0bc2afe910e29431cf6effad6ba3464d4c10597e Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 20 Nov 2012 10:16:15 -0500 Subject: Adding client_fetch_size to queue interface Also more docs --- kafka/queue.py | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) (limited to 'kafka') diff --git a/kafka/queue.py b/kafka/queue.py index b86b1db..d4f5b6c 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest log = logging.getLogger("kafka") class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200): + def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): self.client = copy(client) self.topic = topic self.partition = partition self.out_queue = out_queue self.barrier = barrier + self.consumer_fetch_size = consumer_fetch_size self.consumer_sleep = consumer_sleep / 1000. log.info("Initializing %s" % self) Process.__init__(self) @@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process): def run(self): self.barrier.wait() log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize) + fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size) while True: if self.barrier.is_set() == False: log.info("Shutdown %s" % self) @@ -91,6 +92,10 @@ class KafkaQueue(object): """ KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. + Messages are buffered in the producer thread until producer_flush_timeout or + producer_flush_buffer is reached. + Params ====== client: KafkaClient object @@ -101,6 +106,8 @@ class KafkaQueue(object): Consumer Config =============== + consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default + is 1024 consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches the end of a partition. Default is 200 @@ -133,12 +140,36 @@ class KafkaQueue(object): self.barrier.set() def get(self, block=True, timeout=None): + """ + Consume a message from Kafka + + Params + ====== + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + + Returns + ======= + msg: str, the payload from Kafka + """ return self.in_queue.get(block, timeout).payload def put(self, msg, block=True, timeout=None): - return self.out_queue.put(msg, block, timeout) + """ + Send a message to Kafka + + Params + ====== + msg: std, the message to send + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + """ + self.out_queue.put(msg, block, timeout) def close(self): + """ + Close the internal queues and Kafka consumers/producer + """ self.in_queue.close() self.out_queue.close() self.barrier.clear() -- cgit v1.2.1 From 71fef1b1555c2fb15a89411a5a6f79baebe4d3ae Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 13 Feb 2013 10:22:13 -0500 Subject: Starting work on 0.8 compat --- kafka/client08.py | 524 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ kafka/util.py | 38 ++++ 2 files changed, 562 insertions(+) create mode 100644 kafka/client08.py create mode 100644 kafka/util.py (limited to 'kafka') diff --git a/kafka/client08.py b/kafka/client08.py new file mode 100644 index 0000000..f120f37 --- /dev/null +++ b/kafka/client08.py @@ -0,0 +1,524 @@ +import base64 +from collections import namedtuple, defaultdict +from functools import partial +from itertools import groupby, count +import logging +from operator import attrgetter +import socket +import struct +import time +import zlib + +from .codec import gzip_encode, gzip_decode +from .codec import snappy_encode, snappy_decode +from .util import read_short_string, read_int_string +from .util import relative_unpack +from .util import write_short_string, write_int_string + +log = logging.getLogger("kafka") + +# Request payloads +ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "maxBytes"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) + +# Response payloads +ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) +FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) +PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partitionId", "leader", "replicas", "isr"]) + +# Other useful structs +OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) +Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partitionId"]) + +class ErrorMapping(object): + Unknown = -1 + NoError = 0 + OffsetOutOfRange = 1 + InvalidMessage = 2 + UnknownTopicOrPartition = 3 + InvalidFetchSize = 4 + LeaderNotAvailable = 5 + NotLeaderForPartition = 6 + RequestTimedOut = 7 + BrokerNotAvailable = 8 + ReplicaNotAvailable = 9 + MessageSizeTooLarge = 10 + StaleControllerEpoch = 11 + OffsetMetadataTooLarge = 12 + +class KafkaProtocol(object): + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + + ATTRIBUTE_CODEC_MASK = 0x03 + + @classmethod + def encode_message_header(cls, clientId, correlationId, requestKey): + return struct.pack('>HHiH%ds' % len(clientId), + requestKey, # ApiKey + 0, # ApiVersion + correlationId, # CorrelationId + len(clientId), # + clientId) # ClientId + + @classmethod + def encode_message_set(cls, messages): + message_set = "" + for message in messages: + encoded_message = KafkaProtocol.encode_message(message) + message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) + return message_set + + @classmethod + def encode_message(cls, message): + if message.magic == 0: + msg = struct.pack('>BB', message.magic, message.attributes) + msg += write_int_string(message.key) + msg += write_int_string(message.value) + crc = zlib.crc32(msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) + else: + raise Exception("Unexpected magic number: %d" % message.magic) + return msg + + @classmethod + def create_message(cls, value): + return Message(0, 0, "foo", value) + + @classmethod + def create_gzip_message(cls, value): + message_set = KafkaProtocol.encode_message_set([KafkaProtocol.create_message(value)]) + gzipped = gzip_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), "foo", gzipped) + + @classmethod + def decode_message_set_iter(cls, data): + """ + Decode a MessageSet, iteratively + + Reads repeated elements of (offset, message), calling decode_message to decode a + single message. Since compressed messages contain futher MessageSets, these two methods + have been decoupled so that they may recurse easily. + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + + N.B., the repeating element of the MessageSet is not preceded by an int32 like other + repeating elements in this protocol + """ + cur = 0 + while cur < len(data): + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol.decode_message(msg, offset): + yield OffsetAndMessage(offset, message) + + @classmethod + def decode_message(cls, data, offset): + """ + Decode a single Message + + The only caller of this method is decode_message_set_iter. They are decoupled to + support nested messages (compressed MessageSets). The offset is actually read from + decode_message_set_iter (it is part of the MessageSet payload). + + Format + ======== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ + ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) + assert crc == zlib.crc32(data[4:]) + (key, cur) = read_int_string(data, cur) + (value, cur) = read_int_string(data, cur) + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: + yield (offset, Message(magic, att, key, value)) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: + gz = gzip_decode(value) + for (offset, message) in KafkaProtocol.decode_message_set_iter(gz): + yield (offset, message) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: + snp = snappy_decode(value) + for (offset, message) in KafkaProtocol.decode_message_set_iter(snp): + yield (offset, message) + + @classmethod + def encode_metadata_request(cls, clientId, correlationId, *topics): + # Header + message = cls.encode_message_header(clientId, correlationId, KafkaProtocol.METADATA_KEY) + + # TopicMetadataRequest + message += struct.pack('>i', len(topics)) + for topic in topics: + message += struct.pack('>H%ds' % len(topic), len(topic), topic) + + # Length-prefix the whole thing + return write_int_string(message) + + @classmethod + def decode_metadata_response(cls, data): + # TopicMetadataResponse + cur = 0 + ((correlationId, numBrokers), cur) = relative_unpack('>ii', data, cur) + brokers = {} + for i in range(numBrokers): + ((nodeId, ), cur) = relative_unpack('>i', data, cur) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + brokers[nodeId] = BrokerMetadata(nodeId, host, port) + + ((numTopics,), cur) = relative_unpack('>i', data, cur) + topicMetadata = {} + for i in range(numTopics): + ((topicError,), cur) = relative_unpack('>H', data, cur) + (topicName, cur) = read_short_string(data, cur) + ((numPartitions,), cur) = relative_unpack('>i', data, cur) + partitionMetadata = {} + for j in range(numPartitions): + ((partitionErrorCode, partitionId, leader, numReplicas), cur) = relative_unpack('>Hiii', data, cur) + (replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur) + ((numIsr,), cur) = relative_unpack('>i', data, cur) + (isr, cur) = relative_unpack('>%di' % numIsr, data, cur) + partitionMetadata[partitionId] = PartitionMetadata(topicName, partitionId, leader, replicas, isr) + topicMetadata[topicName] = partitionMetadata + return (brokers, topicMetadata) + + @classmethod + def encode_produce_request(self, clientId, correlationId, payloads=[], acks=1, timeout=1000): + # Group the payloads by topic + sorted_payloads = sorted(payloads, key=attrgetter("topic")) + grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) + + # Pack the message header + message = struct.pack('>HHiH%ds' % len(clientId), + KafkaProtocol.PRODUCE_KEY, # ApiKey + 0, # ApiVersion + correlationId, # CorrelationId + len(clientId), # + clientId) # ClientId + + # Pack the message sets + message += struct.pack('>Hii', acks, timeout, len(grouped_payloads)) + for topic, payload in grouped_payloads: + payloads = list(payloads) + message += struct.pack('>H%dsi' % len(topic), len(topic), topic, len(payloads)) + for payload in payloads: + message_set = KafkaProtocol.encode_message_set(payload.messages) + message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + + # Length-prefix the whole thing + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_produce_response(cls, data): + ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) + for i in range(numTopics): + ((strlen,), cur) = relative_unpack('>H', data, cur) + topic = data[cur:cur+strlen] + cur += strlen + ((numPartitions,), cur) = relative_unpack('>i', data, cur) + for i in range(numPartitions): + ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) + yield ProduceResponse(topic, partition, error, offset) + + @classmethod + def encode_fetch_request(cls, clientId, correlationId, payloads=[], replicaId=-1, maxWaitTime=100, minBytes=1024): + # Group the payloads by topic + sorted_payloads = sorted(payloads, key=attrgetter("topic")) + grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) + + # Pack the message header + message = struct.pack('>HHiH%ds' % len(clientId), + KafkaProtocol.FETCH_KEY, # ApiKey + 0, # ApiVersion + correlationId, # CorrelationId + len(clientId), # + clientId) # ClientId + + # Pack the FetchRequest + message += struct.pack('>iiii', + replicaId, # ReplicaId + maxWaitTime, # MaxWaitTime + minBytes, # MinBytes + len(grouped_payloads)) + for topic, payload in grouped_payloads: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.offset, payload.maxBytes) + + # Length-prefix the whole thing + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_fetch_response_iter(cls, data): + ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) + for i in range(numTopics): + (topic, cur) = read_short_string(data, cur) + ((numPartitions,), cur) = relative_unpack('>i', data, cur) + for i in range(numPartitions): + ((partition, error, highwaterMarkOffset), cur) = relative_unpack('>iHq', data, cur) + (messageSet, cur) = read_int_string(data, cur) + yield FetchResponse(topic, partition, error, highwaterMarkOffset, KafkaProtocol.decode_message_set_iter(messageSet)) + + @classmethod + def encode_offset_request(cls, clientId, correlationId, payloads=[], replicaId=-1): + # Group the payloads by topic + sorted_payloads = sorted(payloads, key=attrgetter("topic")) + grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) + + # Pack the message header + message = struct.pack('>HHiH%ds' % len(clientId), + KafkaProtocol.OFFSET_KEY, # ApiKey + 0, # ApiVersion + correlationId, # CorrelationId + len(clientId), # + clientId) # ClientId + + message += struct.pack('>ii', replicaId, len(grouped_payloads)) + + # Pack the OffsetRequest + for topic, payload in grouped_payloads: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.time, payload.maxOffsets) + + # Length-prefix the whole thing + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_offset_response(cls, data): + ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) + for i in range(numTopics): + (topic, cur) = read_short_string(data, cur) + ((numPartitions,), cur) = relative_unpack('>i', data, cur) + for i in range(numPartitions): + ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) + yield OffsetResponse(topic, partition, error, offset) + + + +class Conn(object): + """ + A socket connection to a single Kafka broker + """ + def __init__(self, host, port, bufsize=1024): + self.host = host + self.port = port + self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) + self._sock.settimeout(10) + + def close(self): + self._sock.close() + + def _consume_response(self): + """ + Fully consumer the response iterator + """ + data = "" + for chunk in self._consume_response_iter(): + data += chunk + return data + + def _consume_response_iter(self): + """ + This method handles the response header and error messages. It + then returns an iterator for the chunks of the response + """ + log.debug("Handling response from Kafka") + + # Header + resp = self._sock.recv(4) + if resp == "": + raise Exception("Got no response from Kafka") + (size,) = struct.unpack('>i', resp) + + messageSize = size - 4 + log.debug("About to read %d bytes from Kafka", messageSize) + + # Response iterator + total = 0 + while total < messageSize: + resp = self._sock.recv(self.bufsize) + log.debug("Read %d bytes from Kafka", len(resp)) + if resp == "": + raise Exception("Underflow") + total += len(resp) + yield resp + + def send(self, requestId, payload): + #print(repr(payload)) + sent = self._sock.sendall(payload) + if sent == 0: + raise RuntimeError("Kafka went away") + self.data = self._consume_response() + #print(repr(self.data)) + + def recv(self, requestId): + return self.data + +class KafkaConnection(object): + """ + Low-level API for Kafka 0.8 + """ + + # ClientId for Kafka + CLIENT_ID = "kafka-python" + + # Global correlation ids + ID_GEN = count() + + def __init__(self, host, port, bufsize=1024): + # We need one connection to bootstrap + self.bufsize = bufsize + self.conns = {(host, port): Conn(host, port, bufsize)} + self.brokers = {} # broker Id -> BrokerMetadata + self.topics_to_brokers = {} # topic Id -> broker Id + self.load_metadata_for_topics() + + def get_conn_for_broker(self, broker): + "Get or create a connection to a broker" + if (broker.host, broker.port) not in self.conns: + self.conns[(broker.host, broker.port)] = Conn(broker.host, broker.port, self.bufsize) + return self.conns[(broker.host, broker.port)] + + def next_id(self): + return KafkaConnection.ID_GEN.next() + + def load_metadata_for_topics(self, *topics): + """ + Discover brokers and metadata for a set of topics + """ + requestId = self.next_id() + request = KafkaProtocol.encode_metadata_request(KafkaConnection.CLIENT_ID, requestId, *topics) + conn = self.conns.values()[0] # Just get the first one in the list + conn.send(requestId, request) + response = conn.recv(requestId) + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + log.debug("Broker metadata: %s", brokers) + log.debug("Topic metadata: %s", topics) + self.brokers.update(brokers) + self.topics_to_brokers = {} + for topic, partitions in topics.items(): + for partition, meta in partitions.items(): + if meta.leader == -1: + log.info("Partition is unassigned, delay for 1s and retry") + time.sleep(1) + self.load_metadata_for_topics(topic) + return + else: + self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] + + def get_leader_for_partition(self, topic, partition): + key = TopicAndPartition(topic, partition) + if key not in self.topics_to_brokers: + self.load_metadata_for_topics(topic) + return self.topics_to_brokers[key] + + def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + # Group the produce requests by topic+partition + sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) + grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + + # Group the produce requests by which broker they go to + payloads_by_broker = defaultdict(list) + for (topic, partition), payload in grouped_payloads: + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + + out = [] + # For each broker, send the list of request payloads + for broker, payloads in payloads_by_broker.items(): + conn = self.get_conn_for_broker(broker) + requestId = self.next_id() + request = KafkaProtocol.encode_produce_request(KafkaConnection.CLIENT_ID, requestId, payloads) + # Send the request + conn.send(requestId, request) + response = conn.recv(requestId) + for produce_response in KafkaProtocol.decode_produce_response(response): + # Check for errors + if fail_on_error == True and produce_response.error != 0: + raise Exception("ProduceRequest for %s failed with errorcode=%d", + (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) + # Run the callback + if callback is not None: + out.append(callback(produce_response)) + else: + out.append(produce_response) + return out + + def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): + """ + Encode and send a FetchRequest + + Payloads are grouped by topic and partition so they can be pipelined to the same + brokers. + """ + # Group the produce requests by topic+partition + sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) + grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + + # Group the produce requests by which broker they go to + payloads_by_broker = defaultdict(list) + for (topic, partition), payload in grouped_payloads: + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + + out = [] + # For each broker, send the list of request payloads + for broker, payloads in payloads_by_broker.items(): + conn = self.get_conn_for_broker(broker) + requestId = self.next_id() + request = KafkaProtocol.encode_fetch_request(KafkaConnection.CLIENT_ID, requestId, payloads) + # Send the request + conn.send(requestId, request) + response = conn.recv(requestId) + for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): + # Check for errors + if fail_on_error == True and fetch_response.error != 0: + raise Exception("FetchRequest %s failed with errorcode=%d", + (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) + # Run the callback + if callback is not None: + out.append(callback(fetch_response)) + else: + out.append(fetch_response) + return out + +if __name__ == "__main__": + # Bootstrap connection + conn = KafkaConnection("localhost", 9092) + + # Create some Messages + messages = (KafkaProtocol.create_gzip_message("GZIPPed"), + KafkaProtocol.create_message("not-gzipped")) + + # Create a ProduceRequest + produce = ProduceRequest("foo5", 0, messages) + + # Send the ProduceRequest + produce_resp = conn.send_produce_request([produce]) + + # Check for errors + for resp in produce_resp: + if resp.error != 0: + raise Exception("ProduceRequest failed with errorcode=%d", resp.error) + print resp + + diff --git a/kafka/util.py b/kafka/util.py new file mode 100644 index 0000000..6f27637 --- /dev/null +++ b/kafka/util.py @@ -0,0 +1,38 @@ +import struct + +def write_int_string(s): + return struct.pack('>i%ds' % len(s), len(s), s) + +def write_short_string(s): + return struct.pack('>H%ds' % len(s), len(s), s) + +def read_short_string(data, cur): + if len(data) < cur+2: + raise IOError("Not enough data left") + (strLen,) = struct.unpack('>H', data[cur:cur+2]) + if strLen == -1: + return (None, cur+2) + cur += 2 + if len(data) < cur+strLen: + raise IOError("Not enough data left") + out = data[cur:cur+strLen] + return (out, cur+strLen) + +def read_int_string(data, cur): + if len(data) < cur+4: + raise IOError("Not enough data left") + (strLen,) = struct.unpack('>i', data[cur:cur+4]) + if strLen == -1: + return (None, cur+4) + cur += 4 + if len(data) < cur+strLen: + raise IOError("Not enough data left") + out = data[cur:cur+strLen] + return (out, cur+strLen) + +def relative_unpack(fmt, data, cur): + size = struct.calcsize(fmt) + if len(data) < cur+size: + raise IOError("Not enough data left") + out = struct.unpack(fmt, data[cur:cur+size]) + return (out, cur+size) -- cgit v1.2.1 From 8b70b9cf6ab28a662bff0b00ece6e7a2924a9e8f Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 20 Feb 2013 10:34:34 -0500 Subject: First pass of cleanup/refactoring Also added a bunch of docstrings --- kafka/NOTES.md | 7 + kafka/client08.py | 672 ++++++++++++++++++++++++++++++++++++------------------ kafka/util.py | 33 ++- 3 files changed, 487 insertions(+), 225 deletions(-) create mode 100644 kafka/NOTES.md (limited to 'kafka') diff --git a/kafka/NOTES.md b/kafka/NOTES.md new file mode 100644 index 0000000..8be6282 --- /dev/null +++ b/kafka/NOTES.md @@ -0,0 +1,7 @@ +For 0.8, we have correlation id so we can potentially interleave requests/responses + +There are a few levels of abstraction: + +* Protocol support: encode/decode the requests/responses +* Socket support: send/recieve messages +* API support: higher level APIs such as: get_topic_metadata diff --git a/kafka/client08.py b/kafka/client08.py index f120f37..11910d1 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,69 +14,117 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string +from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") +############### +# Structs # +############### + # Request payloads ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "maxBytes"]) -OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) +OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) +OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) -PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partitionId", "leader", "replicas", "isr"]) +PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"]) # Other useful structs OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) -TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partitionId"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) class ErrorMapping(object): - Unknown = -1 - NoError = 0 - OffsetOutOfRange = 1 - InvalidMessage = 2 - UnknownTopicOrPartition = 3 - InvalidFetchSize = 4 - LeaderNotAvailable = 5 - NotLeaderForPartition = 6 - RequestTimedOut = 7 - BrokerNotAvailable = 8 - ReplicaNotAvailable = 9 - MessageSizeTooLarge = 10 - StaleControllerEpoch = 11 - OffsetMetadataTooLarge = 12 + # Many of these are not actually used by the client + UNKNOWN = -1 + NO_ERROR = 0 + OFFSET_OUT_OF_RANGE = 1 + INVALID_MESSAGE = 2 + UNKNOWN_TOPIC_OR_PARTITON = 3 + INVALID_FETCH_SIZE = 4 + LEADER_NOT_AVAILABLE = 5 + NOT_LEADER_FOR_PARTITION = 6 + REQUEST_TIMED_OUT = 7 + BROKER_NOT_AVAILABLE = 8 + REPLICA_NOT_AVAILABLE = 9 + MESSAGE_SIZE_TO_LARGE = 10 + STALE_CONTROLLER_EPOCH = 11 + OFFSET_METADATA_TOO_LARGE = 12 class KafkaProtocol(object): - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 + """ + Class to encapsulate all of the protocol encoding/decoding. This class does not + have any state associated with it, it is purely for organization. + """ + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 6 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + ################### + # Private API # + ################### + @classmethod - def encode_message_header(cls, clientId, correlationId, requestKey): - return struct.pack('>HHiH%ds' % len(clientId), - requestKey, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId + def _encode_message_header(cls, client_id, correlation_id, request_key): + """ + Encode the common request envelope + """ + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + 0, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # + client_id) # ClientId @classmethod - def encode_message_set(cls, messages): + def _encode_message_set(cls, messages): + """ + Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are + not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + """ message_set = "" for message in messages: - encoded_message = KafkaProtocol.encode_message(message) + encoded_message = KafkaProtocol._encode_message(message) message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) return message_set @classmethod - def encode_message(cls, message): + def _encode_message(cls, message): + """ + Encode a single message. + + The magic number of a message is a format version number. The only supported + magic number right now is zero + + Format + ====== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ if message.magic == 0: msg = struct.pack('>BB', message.magic, message.attributes) msg += write_int_string(message.key) @@ -87,92 +135,237 @@ class KafkaProtocol(object): raise Exception("Unexpected magic number: %d" % message.magic) return msg - @classmethod - def create_message(cls, value): - return Message(0, 0, "foo", value) @classmethod - def create_gzip_message(cls, value): - message_set = KafkaProtocol.encode_message_set([KafkaProtocol.create_message(value)]) - gzipped = gzip_encode(message_set) - return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), "foo", gzipped) - - @classmethod - def decode_message_set_iter(cls, data): + def _decode_message_set_iter(cls, data): """ - Decode a MessageSet, iteratively + Iteratively decode a MessageSet Reads repeated elements of (offset, message), calling decode_message to decode a single message. Since compressed messages contain futher MessageSets, these two methods have been decoupled so that they may recurse easily. - - Format - ====== - MessageSet => [Offset MessageSize Message] - Offset => int64 - MessageSize => int32 - - N.B., the repeating element of the MessageSet is not preceded by an int32 like other - repeating elements in this protocol """ cur = 0 while cur < len(data): - ((offset, ), cur) = relative_unpack('>q', data, cur) - (msg, cur) = read_int_string(data, cur) - for (offset, message) in KafkaProtocol.decode_message(msg, offset): - yield OffsetAndMessage(offset, message) + try: + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + yield OffsetAndMessage(offset, message) + except BufferUnderflowError: # If we get a partial read of a message, stop + raise StopIteration() @classmethod - def decode_message(cls, data, offset): + def _decode_message(cls, data, offset): """ Decode a single Message The only caller of this method is decode_message_set_iter. They are decoupled to support nested messages (compressed MessageSets). The offset is actually read from decode_message_set_iter (it is part of the MessageSet payload). - - Format - ======== - Message => Crc MagicByte Attributes Key Value - Crc => int32 - MagicByte => int8 - Attributes => int8 - Key => bytes - Value => bytes """ ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) - assert crc == zlib.crc32(data[4:]) + if crc != zlib.crc32(data[4:]): + raise ChecksumError("Message checksum failed") + (key, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur) if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: yield (offset, Message(magic, att, key, value)) elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: gz = gzip_decode(value) - for (offset, message) in KafkaProtocol.decode_message_set_iter(gz): + for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): yield (offset, message) elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: snp = snappy_decode(value) - for (offset, message) in KafkaProtocol.decode_message_set_iter(snp): + for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): yield (offset, message) + ################## + # Public API # + ################## + @classmethod - def encode_metadata_request(cls, clientId, correlationId, *topics): - # Header - message = cls.encode_message_header(clientId, correlationId, KafkaProtocol.METADATA_KEY) + def create_message(cls, payload, key=None): + """ + Construct a Message - # TopicMetadataRequest + Params + ====== + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ + return Message(0, 0, key, payload) + + @classmethod + def create_gzip_message(cls, payloads, key=None): + """ + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Params + ====== + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + gzipped = gzip_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped) + + + @classmethod + def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000): + """ + Encode some ProduceRequest structs + + Params + ====== + client_id: string + correlation_id: string + payloads: list of ProduceRequest + acks: How "acky" you want the request to be + 0: immediate response + 1: written to disk by the leader + 2+: waits for this many number of replicas to sync + -1: waits for all replicas to be in sync + timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) + message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) + for payload in payloads: + message_set = KafkaProtocol._encode_message_set(payload.messages) + message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_produce_response(cls, data): + """ + Decode bytes to a ProduceResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + ((strlen,), cur) = relative_unpack('>h', data, cur) + topic = data[cur:cur+strlen] + cur += strlen + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + yield ProduceResponse(topic, partition, error, offset) + + @classmethod + def encode_fetch_request(cls, client_id, correlation_id, payloads=[], max_wait_time=100, min_bytes=4096): + """ + Encodes some FetchRequest structs + + Params + ====== + client_id: string + correlation_id: string + payloads: list of FetchRequest + max_wait_time: int, how long to block waiting on min_bytes of data + min_bytes: int, the minimum number of bytes to accumulate before returning the response + """ + + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) + message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_fetch_response_iter(cls, data): + """ + Decode bytes to a FetchResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, highwater_mark_offset), cur) = relative_unpack('>ihq', data, cur) + (message_set, cur) = read_int_string(data, cur) + yield FetchResponse(topic, partition, error, highwater_mark_offset, + KafkaProtocol._decode_message_set_iter(message_set)) + + @classmethod + def encode_offset_request(cls, client_id, correlation_id, payloads=[]): + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) + message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id + for topic, payload in payloads_by_topic: + payloads = list(payloads) + message += write_short_string(topic) + message += struct.pack('>i', len(payloads)) + for payload in payloads: + message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets) + return struct.pack('>i%ds' % len(message), len(message), message) + + @classmethod + def decode_offset_response(cls, data): + """ + Decode bytes to an OffsetResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, num_topics), cur) = relative_unpack('>ii', data, 0) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) + yield OffsetResponse(topic, partition, error, offset) + + @classmethod + def encode_metadata_request(cls, client_id, correlation_id, topics=[]): + """ + Encode a MetadataRequest + + Params + ====== + client_id: string + correlation_id: string + topics: list of strings + """ + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.METADATA_KEY) message += struct.pack('>i', len(topics)) for topic in topics: - message += struct.pack('>H%ds' % len(topic), len(topic), topic) - - # Length-prefix the whole thing + message += struct.pack('>h%ds' % len(topic), len(topic), topic) return write_int_string(message) @classmethod def decode_metadata_response(cls, data): - # TopicMetadataResponse - cur = 0 - ((correlationId, numBrokers), cur) = relative_unpack('>ii', data, cur) + """ + Decode bytes to a MetadataResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id, numBrokers), cur) = relative_unpack('>ii', data, 0) + + # Broker info brokers = {} for i in range(numBrokers): ((nodeId, ), cur) = relative_unpack('>i', data, cur) @@ -180,145 +373,123 @@ class KafkaProtocol(object): ((port,), cur) = relative_unpack('>i', data, cur) brokers[nodeId] = BrokerMetadata(nodeId, host, port) - ((numTopics,), cur) = relative_unpack('>i', data, cur) + # Topic info + ((num_topics,), cur) = relative_unpack('>i', data, cur) topicMetadata = {} - for i in range(numTopics): - ((topicError,), cur) = relative_unpack('>H', data, cur) + for i in range(num_topics): + ((topicError,), cur) = relative_unpack('>h', data, cur) (topicName, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) partitionMetadata = {} - for j in range(numPartitions): - ((partitionErrorCode, partitionId, leader, numReplicas), cur) = relative_unpack('>Hiii', data, cur) + for j in range(num_partitions): + ((partitionErrorCode, partition, leader, numReplicas), cur) = relative_unpack('>hiii', data, cur) (replicas, cur) = relative_unpack('>%di' % numReplicas, data, cur) ((numIsr,), cur) = relative_unpack('>i', data, cur) (isr, cur) = relative_unpack('>%di' % numIsr, data, cur) - partitionMetadata[partitionId] = PartitionMetadata(topicName, partitionId, leader, replicas, isr) + partitionMetadata[partition] = PartitionMetadata(topicName, partition, leader, replicas, isr) topicMetadata[topicName] = partitionMetadata return (brokers, topicMetadata) @classmethod - def encode_produce_request(self, clientId, correlationId, payloads=[], acks=1, timeout=1000): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.PRODUCE_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - # Pack the message sets - message += struct.pack('>Hii', acks, timeout, len(grouped_payloads)) - for topic, payload in grouped_payloads: - payloads = list(payloads) - message += struct.pack('>H%dsi' % len(topic), len(topic), topic, len(payloads)) - for payload in payloads: - message_set = KafkaProtocol.encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) - - # Length-prefix the whole thing - return struct.pack('>i%ds' % len(message), len(message), message) - - @classmethod - def decode_produce_response(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): - ((strlen,), cur) = relative_unpack('>H', data, cur) - topic = data[cur:cur+strlen] - cur += strlen - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) - yield ProduceResponse(topic, partition, error, offset) + def encode_offset_commit_request(cls, client_id, correlation_id, group, payloads): + """ + Encode some OffsetCommitRequest structs - @classmethod - def encode_fetch_request(cls, clientId, correlationId, payloads=[], replicaId=-1, maxWaitTime=100, minBytes=1024): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.FETCH_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - # Pack the FetchRequest - message += struct.pack('>iiii', - replicaId, # ReplicaId - maxWaitTime, # MaxWaitTime - minBytes, # MinBytes - len(grouped_payloads)) - for topic, payload in grouped_payloads: + Params + ====== + client_id: string + correlation_id: string + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequest + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) + message += write_short_string(group) + message += struct.pack('>i', len(payloads_by_topic)) + for topic, payload in payloads_by_topic: payloads = list(payloads) message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.offset, payload.maxBytes) - - # Length-prefix the whole thing + message += struct.pack('>iq', payload.partition, payload.offset) + message += write_short_string(payload.metadata) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod - def decode_fetch_response_iter(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): + def decode_offset_commit_response(cls, data): + """ + Decode bytes to an OffsetCommitResponse + + Params + ====== + data: bytes to decode + """ + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + (client_id, cur) = read_short_string(data, cur) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + for i in range(num_topics): (topic, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, highwaterMarkOffset), cur) = relative_unpack('>iHq', data, cur) - (messageSet, cur) = read_int_string(data, cur) - yield FetchResponse(topic, partition, error, highwaterMarkOffset, KafkaProtocol.decode_message_set_iter(messageSet)) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, error), cur) = relative_unpack('>ih', data, cur) + yield OffsetCommitResponse(topic, partition, error) @classmethod - def encode_offset_request(cls, clientId, correlationId, payloads=[], replicaId=-1): - # Group the payloads by topic - sorted_payloads = sorted(payloads, key=attrgetter("topic")) - grouped_payloads = list(groupby(sorted_payloads, key=attrgetter("topic"))) - - # Pack the message header - message = struct.pack('>HHiH%ds' % len(clientId), - KafkaProtocol.OFFSET_KEY, # ApiKey - 0, # ApiVersion - correlationId, # CorrelationId - len(clientId), # - clientId) # ClientId - - message += struct.pack('>ii', replicaId, len(grouped_payloads)) - - # Pack the OffsetRequest - for topic, payload in grouped_payloads: + def encode_offset_fetch_request(cls, client_id, correlation_id, group, payloads): + """ + Encode some OffsetFetchRequest structs + + Params + ====== + client_id: string + correlation_id: string + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequest + """ + payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) + message += write_short_string(group) + message += struct.pack('>i', len(payloads_by_topic)) + for topic, payload in payloads_by_topic: payloads = list(payloads) message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.time, payload.maxOffsets) - - # Length-prefix the whole thing + message += struct.pack('>i', payload.partition) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod - def decode_offset_response(cls, data): - ((correlationId, numTopics), cur) = relative_unpack('>ii', data, 0) - for i in range(numTopics): - (topic, cur) = read_short_string(data, cur) - ((numPartitions,), cur) = relative_unpack('>i', data, cur) - for i in range(numPartitions): - ((partition, error, offset), cur) = relative_unpack('>iHq', data, cur) - yield OffsetResponse(topic, partition, error, offset) + def decode_offset_fetch_response(cls, data): + """ + Decode bytes to an OffsetFetchResponse + Params + ====== + data: bytes to decode + """ + ((correlation_id,), cur) = relative_unpack('>i', data, 0) + (client_id, cur) = read_short_string(data, cur) + ((num_topics,), cur) = relative_unpack('>i', data, cur) + for i in range(num_topics): + (topic, cur) = read_short_string(data, cur) + ((num_partitions,), cur) = relative_unpack('>i', data, cur) + for i in range(num_partitions): + ((partition, offset), cur) = relative_unpack('>iq', data, cur) + (metadata, cur) = read_short_string(data, cur) + ((error,), cur) = relative_unpack('>h', data, cur) + yield OffsetFetchResponse(topic, partition, offset, metadata, error) -class Conn(object): +class KafkaConnection(object): """ A socket connection to a single Kafka broker + + This class is _not_ thread safe. Each call to `send` must be followed + by a call to `recv` in order to get the correct response. Eventually, + we can do something in here to facilitate multiplexed requests/responses + since the Kafka API includes a correlation id. """ - def __init__(self, host, port, bufsize=1024): + def __init__(self, host, port, bufsize=4096): self.host = host self.port = port self.bufsize = bufsize @@ -326,8 +497,9 @@ class Conn(object): self._sock.connect((host, port)) self._sock.settimeout(10) - def close(self): - self._sock.close() + ################### + # Private API # + ################### def _consume_response(self): """ @@ -345,7 +517,7 @@ class Conn(object): """ log.debug("Handling response from Kafka") - # Header + # Read the size off of the header resp = self._sock.recv(4) if resp == "": raise Exception("Got no response from Kafka") @@ -354,61 +526,66 @@ class Conn(object): messageSize = size - 4 log.debug("About to read %d bytes from Kafka", messageSize) - # Response iterator + # Read the remainder of the response total = 0 while total < messageSize: resp = self._sock.recv(self.bufsize) log.debug("Read %d bytes from Kafka", len(resp)) if resp == "": - raise Exception("Underflow") + raise BufferUnderflowError("Not enough data to read this response") total += len(resp) yield resp + ################## + # Public API # + ################## + def send(self, requestId, payload): - #print(repr(payload)) + "Send a request to Kafka" sent = self._sock.sendall(payload) if sent == 0: raise RuntimeError("Kafka went away") self.data = self._consume_response() - #print(repr(self.data)) def recv(self, requestId): + "Get a response from Kafka" return self.data -class KafkaConnection(object): - """ - Low-level API for Kafka 0.8 - """ + def close(self): + "Close this connection" + self._sock.close() - # ClientId for Kafka - CLIENT_ID = "kafka-python" +class KafkaClient(object): - # Global correlation ids + CLIENT_ID = "kafka-python" ID_GEN = count() - def __init__(self, host, port, bufsize=1024): + def __init__(self, host, port, bufsize=4096): # We need one connection to bootstrap self.bufsize = bufsize - self.conns = {(host, port): Conn(host, port, bufsize)} - self.brokers = {} # broker Id -> BrokerMetadata - self.topics_to_brokers = {} # topic Id -> broker Id + self.conns = { # (host, port) -> KafkaConnection + (host, port): KafkaConnection(host, port, bufsize) + } + self.brokers = {} # broker_id -> BrokerMetadata + self.topics_to_brokers = {} # topic_id -> broker_id self.load_metadata_for_topics() def get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = Conn(broker.host, broker.port, self.bufsize) + self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) return self.conns[(broker.host, broker.port)] def next_id(self): - return KafkaConnection.ID_GEN.next() + "Generate a new correlation id" + return KafkaClient.ID_GEN.next() def load_metadata_for_topics(self, *topics): """ Discover brokers and metadata for a set of topics """ requestId = self.next_id() - request = KafkaProtocol.encode_metadata_request(KafkaConnection.CLIENT_ID, requestId, *topics) + request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) conn = self.conns.values()[0] # Just get the first one in the list conn.send(requestId, request) response = conn.recv(requestId) @@ -435,12 +612,11 @@ class KafkaConnection(object): def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): # Group the produce requests by topic+partition - sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) - grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in grouped_payloads: + for (topic, partition), payload in payloads_by_topic_and_partition: payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) out = [] @@ -448,7 +624,7 @@ class KafkaConnection(object): for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) requestId = self.next_id() - request = KafkaProtocol.encode_produce_request(KafkaConnection.CLIENT_ID, requestId, payloads) + request = KafkaProtocol.encode_produce_request(KafkaClient.CLIENT_ID, requestId, payloads) # Send the request conn.send(requestId, request) response = conn.recv(requestId) @@ -472,12 +648,11 @@ class KafkaConnection(object): brokers. """ # Group the produce requests by topic+partition - sorted_payloads = sorted(payloads, key=lambda x: (x.topic, x.partition)) - grouped_payloads = groupby(sorted_payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in grouped_payloads: + for (topic, partition), payload in payloads_by_topic_and_partition: payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) out = [] @@ -485,7 +660,7 @@ class KafkaConnection(object): for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) requestId = self.next_id() - request = KafkaProtocol.encode_fetch_request(KafkaConnection.CLIENT_ID, requestId, payloads) + request = KafkaProtocol.encode_fetch_request(KafkaClient.CLIENT_ID, requestId, payloads) # Send the request conn.send(requestId, request) response = conn.recv(requestId) @@ -501,24 +676,87 @@ class KafkaConnection(object): out.append(fetch_response) return out + def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): + conn = self.conns.values()[0] # Just get the first one in the list + requestId = self.next_id() + request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) + conn.send(requestId, request) + response = conn.recv(requestId) + out = [] + for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): + if fail_on_error == True and offset_commit_response.error != 0: + raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) + if callback is not None: + out.append(callback(offset_commit_response)) + else: + out.append(offset_commit_response) + return out + + def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): + conn = self.conns.values()[0] # Just get the first one in the list + requestId = self.next_id() + request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) + conn.send(requestId, request) + response = conn.recv(requestId) + out = [] + for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): + if fail_on_error == True and offset_fetch_response.error != 0: + raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error) + if callback is not None: + out.append(callback(offset_fetch_response)) + else: + out.append(offset_fetch_response) + return out + + + if __name__ == "__main__": # Bootstrap connection - conn = KafkaConnection("localhost", 9092) + conn = KafkaClient("localhost", 9092) # Create some Messages messages = (KafkaProtocol.create_gzip_message("GZIPPed"), KafkaProtocol.create_message("not-gzipped")) # Create a ProduceRequest - produce = ProduceRequest("foo5", 0, messages) + produce = ProduceRequest(topic="foo5", partition=0, messages=messages) # Send the ProduceRequest - produce_resp = conn.send_produce_request([produce]) + produce_resp = conn.send_produce_request(payloads=[produce]) # Check for errors for resp in produce_resp: if resp.error != 0: raise Exception("ProduceRequest failed with errorcode=%d", resp.error) print resp - + + # Offset commit/fetch + #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")]) + #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)]) + + print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)]) + offset = 0 + done = False + while not done: + print offset + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]): + i = 0 + for msg in resp.messages: + print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")]) + print msg, offset + offset = msg.offset+1 + i += 1 + if i == 0: + raise StopIteration("no more messages") + +class Consumer(object): + def __init__(self, conn): + self._conn = conn + + + +class Producer(object): + pass + + diff --git a/kafka/util.py b/kafka/util.py index 6f27637..0623f35 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,38 +1,55 @@ +from itertools import groupby import struct def write_int_string(s): - return struct.pack('>i%ds' % len(s), len(s), s) + if s is None: + return struct.pack('>i', -1) + else: + return struct.pack('>i%ds' % len(s), len(s), s) def write_short_string(s): - return struct.pack('>H%ds' % len(s), len(s), s) + if s is None: + return struct.pack('>h', -1) + else: + return struct.pack('>h%ds' % len(s), len(s), s) def read_short_string(data, cur): if len(data) < cur+2: - raise IOError("Not enough data left") - (strLen,) = struct.unpack('>H', data[cur:cur+2]) + raise BufferUnderflowError("Not enough data left") + (strLen,) = struct.unpack('>h', data[cur:cur+2]) if strLen == -1: return (None, cur+2) cur += 2 if len(data) < cur+strLen: - raise IOError("Not enough data left") + raise BufferUnderflowError("Not enough data left") out = data[cur:cur+strLen] return (out, cur+strLen) def read_int_string(data, cur): if len(data) < cur+4: - raise IOError("Not enough data left") + raise BufferUnderflowError("Not enough data left") (strLen,) = struct.unpack('>i', data[cur:cur+4]) if strLen == -1: return (None, cur+4) cur += 4 if len(data) < cur+strLen: - raise IOError("Not enough data left") + raise BufferUnderflowError("Not enough data left") out = data[cur:cur+strLen] return (out, cur+strLen) def relative_unpack(fmt, data, cur): size = struct.calcsize(fmt) if len(data) < cur+size: - raise IOError("Not enough data left") + raise BufferUnderflowError("Not enough data left") out = struct.unpack(fmt, data[cur:cur+size]) return (out, cur+size) + +def group_list_by_key(l, key): + sorted_l = sorted(l, key=key) + return list(groupby(sorted_l, key=key)) + +class BufferUnderflowError(Exception): + pass + +class ChecksumError(Exception): + pass -- cgit v1.2.1 From e87c561723be25fcfa2564030367196231aa366e Mon Sep 17 00:00:00 2001 From: David Arthur Date: Wed, 20 Feb 2013 22:05:21 -0500 Subject: Fix a bunch of bugs --- kafka/client08.py | 194 +++++++++++++++++++++++++++++++++++++----------------- kafka/util.py | 9 ++- 2 files changed, 138 insertions(+), 65 deletions(-) (limited to 'kafka') diff --git a/kafka/client08.py b/kafka/client08.py index 11910d1..b048d68 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,6 +14,7 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string +from .util import group_list_by_key from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") @@ -217,7 +218,7 @@ class KafkaProtocol(object): @classmethod - def encode_produce_request(self, client_id, correlation_id, payloads=[], acks=1, timeout=1000): + def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): """ Encode some ProduceRequest structs @@ -236,8 +237,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) for payload in payloads: message_set = KafkaProtocol._encode_message_set(payload.messages) @@ -280,8 +280,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -312,8 +311,7 @@ class KafkaProtocol(object): payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -406,8 +404,7 @@ class KafkaProtocol(object): message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) message += write_short_string(group) message += struct.pack('>i', len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -450,8 +447,7 @@ class KafkaProtocol(object): message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message += write_short_string(group) message += struct.pack('>i', len(payloads_by_topic)) - for topic, payload in payloads_by_topic: - payloads = list(payloads) + for topic, payloads in payloads_by_topic.items(): message += write_short_string(topic) message += struct.pack('>i', len(payloads)) for payload in payloads: @@ -582,13 +578,14 @@ class KafkaClient(object): def load_metadata_for_topics(self, *topics): """ - Discover brokers and metadata for a set of topics + Discover brokers and metadata for a set of topics. This method will + recurse in the event of a retry. """ requestId = self.next_id() request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) - conn = self.conns.values()[0] # Just get the first one in the list - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + raise Exception("All servers failed to process request") (brokers, topics) = KafkaProtocol.decode_metadata_response(response) log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) @@ -600,7 +597,6 @@ class KafkaClient(object): log.info("Partition is unassigned, delay for 1s and retry") time.sleep(1) self.load_metadata_for_topics(topic) - return else: self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] @@ -608,18 +604,44 @@ class KafkaClient(object): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: self.load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: + raise Exception("Partition does not exist: %s" % str(key)) return self.topics_to_brokers[key] def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + """ + Encode and send some ProduceRequests + + ProduceRequests will be grouped by (topic, partition) and then sent to a specific + broker. Output is a list of responses in the same order as the list of payloads + specified + + Params + ====== + payloads: list of ProduceRequest + fail_on_error: boolean, should we raise an Exception if we encounter an API error? + callback: function, instead of returning the ProduceResponse, first pass it through this function + + Return + ====== + list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + """ + key_fn = lambda x: (x.topic, x.partition) + + # Note the order of the incoming payloads + original_keys = [key_fn(payload) for payload in payloads] + # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in payloads_by_topic_and_partition: - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + for (topic, partition), payloads in payloads_by_topic_and_partition.items(): + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + + # Accumulate the responses in a dictionary, keyed by key_fn + acc = {} - out = [] # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) @@ -635,10 +657,13 @@ class KafkaClient(object): (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: - out.append(callback(produce_response)) + acc[key_fn(produce_response)] = callback(produce_response) else: - out.append(produce_response) - return out + acc[key_fn(produce_response)] = produce_response + + print(acc) + # Order the accumulated responses by the original key order + return (acc[k] for k in original_keys) def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): """ @@ -647,15 +672,22 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ + key_fn = lambda x: (x.topic, x.partition) + + # Note the order of the incoming payloads + original_keys = [key_fn(payload) for payload in payloads] + # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=lambda x: (x.topic, x.partition)) + payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) # Group the produce requests by which broker they go to payloads_by_broker = defaultdict(list) - for (topic, partition), payload in payloads_by_topic_and_partition: - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += list(payload) + for (topic, partition), payloads in payloads_by_topic_and_partition.items(): + payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + + # Accumulate the responses in a dictionary, keyed by key_fn + acc = {} - out = [] # For each broker, send the list of request payloads for broker, payloads in payloads_by_broker.items(): conn = self.get_conn_for_broker(broker) @@ -667,21 +699,41 @@ class KafkaClient(object): for fetch_response in KafkaProtocol.decode_fetch_response_iter(response): # Check for errors if fail_on_error == True and fetch_response.error != 0: - raise Exception("FetchRequest %s failed with errorcode=%d", + raise Exception("FetchRequest %s failed with errorcode=%d" % (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback if callback is not None: - out.append(callback(fetch_response)) + acc[key_fn(fetch_response)] = callback(fetch_response) else: - out.append(fetch_response) - return out + acc[key_fn(fetch_response)] = fetch_response + + # Order the accumulated responses by the original key order + return (acc[k] for k in original_keys) + + def try_send_request(self, requestId, request): + """ + Attempt to send a broker-agnostic request to one of the available brokers. + Keep trying until you succeed. + """ + for conn in self.conns.values(): + try: + conn.send(requestId, request) + response = conn.recv(requestId) + return response + except Exception: + log.warning("Could not commit offset to server %s, trying next server", conn) + continue + return None def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - conn = self.conns.values()[0] # Just get the first one in the list requestId = self.next_id() request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): if fail_on_error == True and offset_commit_response.error != 0: @@ -693,15 +745,19 @@ class KafkaClient(object): return out def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - conn = self.conns.values()[0] # Just get the first one in the list requestId = self.next_id() request = KafkaProtocol.encode_offset_fetch_request(KafkaClient.CLIENT_ID, requestId, group, payloads) - conn.send(requestId, request) - response = conn.recv(requestId) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None out = [] for offset_fetch_response in KafkaProtocol.decode_offset_fetch_response(response): if fail_on_error == True and offset_fetch_response.error != 0: - raise Exception("OffsetFetchRequest failed with errorcode=%s", offset_fetch_response.error) + raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( + offset_fetch_response.topic, offset_fetch_response.partition, offset_fetch_response.error)) if callback is not None: out.append(callback(offset_fetch_response)) else: @@ -709,20 +765,22 @@ class KafkaClient(object): return out - if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + topic = "foo8" # Bootstrap connection conn = KafkaClient("localhost", 9092) # Create some Messages - messages = (KafkaProtocol.create_gzip_message("GZIPPed"), + messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), KafkaProtocol.create_message("not-gzipped")) - # Create a ProduceRequest - produce = ProduceRequest(topic="foo5", partition=0, messages=messages) + produce1 = ProduceRequest(topic=topic, partition=0, messages=messages) + produce2 = ProduceRequest(topic=topic, partition=1, messages=messages) # Send the ProduceRequest - produce_resp = conn.send_produce_request(payloads=[produce]) + produce_resp = conn.send_produce_request(payloads=[produce1, produce2]) # Check for errors for resp in produce_resp: @@ -734,29 +792,41 @@ if __name__ == "__main__": #conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("topic-1", 0, 42, "METADATA?")]) #conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("topic-1", 0)]) - print conn.send_offset_fetch_request(group="group", payloads=[OffsetFetchRequest("foo5", 0)]) - offset = 0 - done = False - while not done: - print offset - for resp in conn.send_fetch_request(payloads=[FetchRequest(topic="foo5", partition=0, offset=offset, max_bytes=4096)]): + def init_offsets(offset_response): + if offset_response.error not in (ErrorMapping.NO_ERROR, ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON): + raise Exception("OffsetFetch failed: %s" % (offset_response)) + elif offset_response.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: + return 0 + else: + return offset_response.offset + + # Load offsets + (offset1, offset2) = conn.send_offset_fetch_request( + group="group1", + payloads=[OffsetFetchRequest(topic, 0),OffsetFetchRequest(topic, 1)], + fail_on_error=False, + callback=init_offsets + ) + print offset1, offset2 + + while True: + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=0, offset=offset1, max_bytes=4096)]): i = 0 for msg in resp.messages: - print conn.send_offset_commit_request(group="group", payloads=[OffsetCommitRequest("foo5", 0, offset, "")]) - print msg, offset - offset = msg.offset+1 + print msg + offset1 = msg.offset+1 + print offset1, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 0, offset1, "")]) i += 1 if i == 0: raise StopIteration("no more messages") -class Consumer(object): - def __init__(self, conn): - self._conn = conn - - - -class Producer(object): - pass - - + for resp in conn.send_fetch_request(payloads=[FetchRequest(topic=topic, partition=1, offset=offset2, max_bytes=4096)]): + i = 0 + for msg in resp.messages: + print msg + offset2 = msg.offset+1 + print offset2, conn.send_offset_commit_request(group="group1", payloads=[OffsetCommitRequest(topic, 1, offset2, "")]) + i += 1 + if i == 0: + raise StopIteration("no more messages") diff --git a/kafka/util.py b/kafka/util.py index 0623f35..cb8f7f5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -44,9 +44,12 @@ def relative_unpack(fmt, data, cur): out = struct.unpack(fmt, data[cur:cur+size]) return (out, cur+size) -def group_list_by_key(l, key): - sorted_l = sorted(l, key=key) - return list(groupby(sorted_l, key=key)) +def group_list_by_key(it, key): + sorted_it = sorted(it, key=key) + out = {} + for k, group in groupby(sorted_it, key=key): + out[k] = list(group) + return out class BufferUnderflowError(Exception): pass -- cgit v1.2.1 From 2a3d231aa61642c57537bc2128dd4f2bd30f35dd Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 22 Feb 2013 23:09:25 -0500 Subject: Protocol and low-level client done, adding tests --- kafka/client08.py | 180 +++++++++++++++++++++++++++++++++--------------------- kafka/util.py | 16 ++--- 2 files changed, 118 insertions(+), 78 deletions(-) (limited to 'kafka') diff --git a/kafka/client08.py b/kafka/client08.py index b048d68..49d786f 100644 --- a/kafka/client08.py +++ b/kafka/client08.py @@ -14,7 +14,7 @@ from .codec import snappy_encode, snappy_decode from .util import read_short_string, read_int_string from .util import relative_unpack from .util import write_short_string, write_int_string -from .util import group_list_by_key +from .util import group_by_topic_and_partition from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") @@ -33,7 +33,7 @@ OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) # Response payloads ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) -OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offset"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) @@ -74,6 +74,9 @@ class KafkaProtocol(object): OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 ################### # Private API # @@ -171,13 +174,13 @@ class KafkaProtocol(object): (key, cur) = read_int_string(data, cur) (value, cur) = read_int_string(data, cur) - if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 0: + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE: yield (offset, Message(magic, att, key, value)) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 1: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP: gz = gzip_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): yield (offset, message) - elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == 2: + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY: snp = snappy_decode(value) for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): yield (offset, message) @@ -214,8 +217,25 @@ class KafkaProtocol(object): message_set = KafkaProtocol._encode_message_set( [KafkaProtocol.create_message(payload) for payload in payloads]) gzipped = gzip_encode(message_set) - return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & 0x01), key, gzipped) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) + @classmethod + def create_snappy_message(cls, payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Params + ====== + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + snapped = snappy_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY), key, snapped) @classmethod def encode_produce_request(cls, client_id, correlation_id, payloads=[], acks=1, timeout=1000): @@ -234,14 +254,14 @@ class KafkaProtocol(object): -1: waits for all replicas to be in sync timeout: Maximum time the server will wait for acks from replicas. This is _not_ a socket timeout """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.PRODUCE_KEY) - message += struct.pack('>hii', acks, timeout, len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): - message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(payloads)) - for payload in payloads: + message += struct.pack('>hii', acks, timeout, len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): + message += struct.pack('>h%dsi' % len(topic), len(topic), topic, len(topic_payloads)) + for partition, payload in topic_payloads.items(): message_set = KafkaProtocol._encode_message_set(payload.messages) - message += struct.pack('>ii%ds' % len(message_set), payload.partition, len(message_set), message_set) + message += struct.pack('>ii%ds' % len(message_set), partition, len(message_set), message_set) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -276,15 +296,15 @@ class KafkaProtocol(object): max_wait_time: int, how long to block waiting on min_bytes of data min_bytes: int, the minimum number of bytes to accumulate before returning the response """ - - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.FETCH_KEY) - message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>iiii', -1, max_wait_time, min_bytes, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.offset, payload.max_bytes) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.offset, payload.max_bytes) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -308,14 +328,14 @@ class KafkaProtocol(object): @classmethod def encode_offset_request(cls, client_id, correlation_id, payloads=[]): - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_KEY) - message += struct.pack('>ii', -1, len(payloads_by_topic)) # -1 is the replica id - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>ii', -1, len(grouped_payloads)) # -1 is the replica id + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iqi', payload.partition, payload.time, payload.max_offsets) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iqi', partition, payload.time, payload.max_offsets) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -332,8 +352,12 @@ class KafkaProtocol(object): (topic, cur) = read_short_string(data, cur) ((num_partitions,), cur) = relative_unpack('>i', data, cur) for i in range(num_partitions): - ((partition, error, offset), cur) = relative_unpack('>ihq', data, cur) - yield OffsetResponse(topic, partition, error, offset) + ((partition, error, num_offsets,), cur) = relative_unpack('>ihi', data, cur) + offsets = [] + for j in range(num_offsets): + ((offset,), cur) = relative_unpack('>q', data, cur) + offsets.append(offset) + yield OffsetResponse(topic, partition, error, tuple(offsets)) @classmethod def encode_metadata_request(cls, client_id, correlation_id, topics=[]): @@ -400,15 +424,15 @@ class KafkaProtocol(object): group: string, the consumer group you are committing offsets for payloads: list of OffsetCommitRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads= group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_COMMIT_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>iq', payload.partition, payload.offset) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>iq', partition, payload.offset) message += write_short_string(payload.metadata) return struct.pack('>i%ds' % len(message), len(message), message) @@ -421,6 +445,7 @@ class KafkaProtocol(object): ====== data: bytes to decode """ + data = data[2:] # TODO remove me when versionId is removed ((correlation_id,), cur) = relative_unpack('>i', data, 0) (client_id, cur) = read_short_string(data, cur) ((num_topics,), cur) = relative_unpack('>i', data, cur) @@ -443,15 +468,15 @@ class KafkaProtocol(object): group: string, the consumer group you are fetching offsets for payloads: list of OffsetFetchRequest """ - payloads_by_topic = group_list_by_key(payloads, key=attrgetter("topic")) + grouped_payloads = group_by_topic_and_partition(payloads) message = cls._encode_message_header(client_id, correlation_id, KafkaProtocol.OFFSET_FETCH_KEY) message += write_short_string(group) - message += struct.pack('>i', len(payloads_by_topic)) - for topic, payloads in payloads_by_topic.items(): + message += struct.pack('>i', len(grouped_payloads)) + for topic, topic_payloads in grouped_payloads.items(): message += write_short_string(topic) - message += struct.pack('>i', len(payloads)) - for payload in payloads: - message += struct.pack('>i', payload.partition) + message += struct.pack('>i', len(topic_payloads)) + for partition, payload in topic_payloads.items(): + message += struct.pack('>i', partition) return struct.pack('>i%ds' % len(message), len(message), message) @classmethod @@ -493,6 +518,9 @@ class KafkaConnection(object): self._sock.connect((host, port)) self._sock.settimeout(10) + def __str__(self): + return "" % (self.host, self.port) + ################### # Private API # ################### @@ -536,6 +564,8 @@ class KafkaConnection(object): # Public API # ################## + # TODO multiplex socket communication to allow for multi-threaded clients + def send(self, requestId, payload): "Send a request to Kafka" sent = self._sock.sendall(payload) @@ -566,6 +596,10 @@ class KafkaClient(object): self.topics_to_brokers = {} # topic_id -> broker_id self.load_metadata_for_topics() + def close(self): + for conn in self.conns.values(): + conn.close() + def get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: @@ -626,20 +660,14 @@ class KafkaClient(object): ====== list of ProduceResponse or callback(ProduceResponse), in the order of input payloads """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)] += payloads + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary acc = {} # For each broker, send the list of request payloads @@ -657,11 +685,10 @@ class KafkaClient(object): (TopicAndPartition(produce_response.topic, produce_response.partition), produce_response.error)) # Run the callback if callback is not None: - acc[key_fn(produce_response)] = callback(produce_response) + acc[(produce_response.topic, produce_response.partition)] = callback(produce_response) else: - acc[key_fn(produce_response)] = produce_response + acc[(produce_response.topic, produce_response.partition)] = produce_response - print(acc) # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -672,20 +699,14 @@ class KafkaClient(object): Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - key_fn = lambda x: (x.topic, x.partition) - - # Note the order of the incoming payloads - original_keys = [key_fn(payload) for payload in payloads] - - # Group the produce requests by topic+partition - payloads_by_topic_and_partition = group_list_by_key(payloads, key=key_fn) - # Group the produce requests by which broker they go to + original_keys = [] payloads_by_broker = defaultdict(list) - for (topic, partition), payloads in payloads_by_topic_and_partition.items(): - payloads_by_broker[self.get_leader_for_partition(topic, partition)] += payloads + for payload in payloads: + payloads_by_broker[self.get_leader_for_partition(payload.topic, payload.partition)].append(payload) + original_keys.append((payload.topic, payload.partition)) - # Accumulate the responses in a dictionary, keyed by key_fn + # Accumulate the responses in a dictionary, keyed by topic+partition acc = {} # For each broker, send the list of request payloads @@ -703,9 +724,9 @@ class KafkaClient(object): (TopicAndPartition(fetch_response.topic, fetch_response.partition), fetch_response.error)) # Run the callback if callback is not None: - acc[key_fn(fetch_response)] = callback(fetch_response) + acc[(fetch_response.topic, fetch_response.partition)] = callback(fetch_response) else: - acc[key_fn(fetch_response)] = fetch_response + acc[(fetch_response.topic, fetch_response.partition)] = fetch_response # Order the accumulated responses by the original key order return (acc[k] for k in original_keys) @@ -720,11 +741,30 @@ class KafkaClient(object): conn.send(requestId, request) response = conn.recv(requestId) return response - except Exception: - log.warning("Could not commit offset to server %s, trying next server", conn) + except Exception, e: + log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) continue return None + def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): + requestId = self.next_id() + request = KafkaProtocol.encode_offset_request(KafkaClient.CLIENT_ID, requestId, payloads) + response = self.try_send_request(requestId, request) + if response is None: + if fail_on_error is True: + raise Exception("All servers failed to process request") + else: + return None + out = [] + for offset_response in KafkaProtocol.decode_offset_response(response): + if fail_on_error == True and offset_response.error != 0: + raise Exception("OffsetRequest failed with errorcode=%s", offset_response.error) + if callback is not None: + out.append(callback(offset_response)) + else: + out.append(offset_response) + return out + def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): requestId = self.next_id() request = KafkaProtocol.encode_offset_commit_request(KafkaClient.CLIENT_ID, requestId, group, payloads) @@ -737,6 +777,7 @@ class KafkaClient(object): out = [] for offset_commit_response in KafkaProtocol.decode_offset_commit_response(response): if fail_on_error == True and offset_commit_response.error != 0: + print(offset_commit_response) raise Exception("OffsetCommitRequest failed with errorcode=%s", offset_commit_response.error) if callback is not None: out.append(callback(offset_commit_response)) @@ -770,7 +811,7 @@ if __name__ == "__main__": topic = "foo8" # Bootstrap connection - conn = KafkaClient("localhost", 9092) + conn = KafkaClient("localhost", 49720) # Create some Messages messages = (KafkaProtocol.create_gzip_message(["GZIPPed"]), @@ -799,7 +840,6 @@ if __name__ == "__main__": return 0 else: return offset_response.offset - # Load offsets (offset1, offset2) = conn.send_offset_fetch_request( group="group1", diff --git a/kafka/util.py b/kafka/util.py index cb8f7f5..509c5b8 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,15 +1,16 @@ +from collections import defaultdict from itertools import groupby import struct def write_int_string(s): if s is None: - return struct.pack('>i', -1) + return struct.pack('>i', 0) # TODO change this to -1 when KAFKA-771 is accepted else: return struct.pack('>i%ds' % len(s), len(s), s) def write_short_string(s): if s is None: - return struct.pack('>h', -1) + return struct.pack('>h', 0) # TODO change this to -1 when KAFKA-771 is accepted else: return struct.pack('>h%ds' % len(s), len(s), s) @@ -44,12 +45,11 @@ def relative_unpack(fmt, data, cur): out = struct.unpack(fmt, data[cur:cur+size]) return (out, cur+size) -def group_list_by_key(it, key): - sorted_it = sorted(it, key=key) - out = {} - for k, group in groupby(sorted_it, key=key): - out[k] = list(group) - return out +def group_by_topic_and_partition(tuples): + out = defaultdict(dict) + for t in tuples: + out[t.topic][t.partition] = t + return out class BufferUnderflowError(Exception): pass -- cgit v1.2.1 From eac51e9c68c50f15962b6c785ede92cb3d512a17 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 1 Mar 2013 15:12:21 -0500 Subject: Integration tests passing --- kafka/__init__.py | 3 +- kafka/client.py | 1239 ++++++++++++++++++++++++++++++++--------------------- kafka/client08.py | 872 ------------------------------------- kafka/util.py | 4 +- 4 files changed, 744 insertions(+), 1374 deletions(-) delete mode 100644 kafka/client08.py (limited to 'kafka') diff --git a/kafka/__init__.py b/kafka/__init__.py index 1dcae86..166eeb9 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -5,8 +5,7 @@ __license__ = 'Apache License 2.0' __copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0' from .client import ( - KafkaClient, KafkaException, - Message, ProduceRequest, FetchRequest, OffsetRequest + KafkaClient ) from .codec import gzip_encode, gzip_decode from .codec import snappy_encode, snappy_decode diff --git a/kafka/client.py b/kafka/client.py index c25e2d2..5da3919 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -1,631 +1,874 @@ -from collections import namedtuple +import base64 +from collections import namedtuple, defaultdict +from functools import partial +from itertools import groupby, count import logging +from operator import attrgetter import socket import struct +import time import zlib from .codec import gzip_encode, gzip_decode from .codec import snappy_encode, snappy_decode +from .util import read_short_string, read_int_string +from .util import relative_unpack +from .util import write_short_string, write_int_string +from .util import group_by_topic_and_partition +from .util import BufferUnderflowError, ChecksumError log = logging.getLogger("kafka") -error_codes = { - -1: "UnknownError", - 0: None, - 1: "OffsetOutOfRange", - 2: "InvalidMessage", - 3: "WrongPartition", - 4: "InvalidFetchSize" -} - -class KafkaException(Exception): - def __init__(self, errorType): - self.errorType = errorType - def __str__(self): - return str(self.errorType) +############### +# Structs # +############### -Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) -FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) +# Request payloads ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) - -def length_prefix_message(msg): - """ - Prefix a message with it's length as an int - """ - return struct.pack('>i', len(msg)) + msg - -class KafkaClient(object): +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "max_bytes"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "max_offsets"]) +OffsetCommitRequest = namedtuple("OffsetCommitRequest", ["topic", "partition", "offset", "metadata"]) +OffsetFetchRequest = namedtuple("OffsetFetchRequest", ["topic", "partition"]) + +# Response payloads +ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) +FetchResponse = namedtuple("FetchResponse", ["topic", "partition", "error", "highwaterMark", "messages"]) +OffsetResponse = namedtuple("OffsetResponse", ["topic", "partition", "error", "offsets"]) +OffsetCommitResponse = namedtuple("OffsetCommitResponse", ["topic", "partition", "error"]) +OffsetFetchResponse = namedtuple("OffsetFetchResponse", ["topic", "partition", "offset", "metadata", "error"]) +BrokerMetadata = namedtuple("BrokerMetadata", ["nodeId", "host", "port"]) +PartitionMetadata = namedtuple("PartitionMetadata", ["topic", "partition", "leader", "replicas", "isr"]) + +# Other useful structs +OffsetAndMessage = namedtuple("OffsetAndMessage", ["offset", "message"]) +Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) +TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) + +class ErrorMapping(object): + # Many of these are not actually used by the client + UNKNOWN = -1 + NO_ERROR = 0 + OFFSET_OUT_OF_RANGE = 1 + INVALID_MESSAGE = 2 + UNKNOWN_TOPIC_OR_PARTITON = 3 + INVALID_FETCH_SIZE = 4 + LEADER_NOT_AVAILABLE = 5 + NOT_LEADER_FOR_PARTITION = 6 + REQUEST_TIMED_OUT = 7 + BROKER_NOT_AVAILABLE = 8 + REPLICA_NOT_AVAILABLE = 9 + MESSAGE_SIZE_TO_LARGE = 10 + STALE_CONTROLLER_EPOCH = 11 + OFFSET_METADATA_TOO_LARGE = 12 + +class KafkaProtocol(object): """ - Request Structure - ================= - - ::= - ::= - ::= 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Response Structure - ================== - - ::= - ::= - ::= -1 | 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Messages are big-endian byte order + Class to encapsulate all of the protocol encoding/decoding. This class does not + have any state associated with it, it is purely for organization. """ - - PRODUCE_KEY = 0 - FETCH_KEY = 1 - MULTIFETCH_KEY = 2 - MULTIPRODUCE_KEY = 3 - OFFSET_KEY = 4 + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 6 + OFFSET_FETCH_KEY = 7 ATTRIBUTE_CODEC_MASK = 0x03 + CODEC_NONE = 0x00 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 - def __init__(self, host, port, bufsize=1024): - self.host = host - self.port = port - self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) - log.debug("Connected to %s on %d", host, port) - - def __copy__(self): - return KafkaClient(self.host, self.port, self.bufsize) + ################### + # Private API # + ################### - ###################### - # Protocol Stuff # - ###################### - - def _consume_response_iter(self): + @classmethod + def _encode_message_header(cls, client_id, correlation_id, request_key): """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response + Encode the common request envelope """ - log.debug("Handling response from Kafka") - # Header - resp = self._sock.recv(6) - if resp == "": - raise Exception("Got no response from Kafka") - (size, err) = struct.unpack('>iH', resp) - - log.debug("About to read %d bytes from Kafka", size-2) - # Handle error - error = error_codes.get(err) - if error is not None: - raise KafkaException(error) - - # Response iterator - total = 0 - while total < (size-2): - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise Exception("Underflow") - total += len(resp) - yield resp + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + 0, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # + client_id) # ClientId - def _consume_response(self): + @classmethod + def _encode_message_set(cls, messages): """ - Fully consumer the response iterator + Encode a MessageSet. Unlike other arrays in the protocol, MessageSets are + not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data + message_set = "" + for message in messages: + encoded_message = KafkaProtocol._encode_message(message) + message_set += struct.pack('>qi%ds' % len(encoded_message), 0, len(encoded_message), encoded_message) + return message_set @classmethod - def encode_message(cls, message): + def _encode_message(cls, message): """ - Encode a Message from a Message tuple + Encode a single message. - Params + The magic number of a message is a format version number. The only supported + magic number right now is zero + + Format ====== - message: Message - - Wire Format - =========== - ::= | - ::= 0 - ::= 1 - ::= - ::= - ::= - ::= - ::= - ::= - - The crc is a crc32 checksum of the message payload. The attributes are bitmask - used for indicating the compression algorithm. + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes """ if message.magic == 0: - msg = struct.pack('>Bi%ds' % len(message.payload), - message.magic, message.crc, message.payload) - elif message.magic == 1: - msg = struct.pack('>BBi%ds' % len(message.payload), - message.magic, message.attributes, message.crc, message.payload) + msg = struct.pack('>BB', message.magic, message.attributes) + msg += write_int_string(message.key) + msg += write_int_string(message.value) + crc = zlib.crc32(msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) else: raise Exception("Unexpected magic number: %d" % message.magic) - msg = length_prefix_message(msg) - log.debug("Encoded %s as %r" % (message, msg)) return msg + @classmethod - def encode_message_set(cls, messages): + def _decode_message_set_iter(cls, data): """ - Encode a MessageSet + Iteratively decode a MessageSet - One or more concatenated Messages + Reads repeated elements of (offset, message), calling decode_message to decode a + single message. Since compressed messages contain futher MessageSets, these two methods + have been decoupled so that they may recurse easily. """ - message_set = "" - for message in messages: - encoded_message = cls.encode_message(message) - message_set += encoded_message - return message_set + cur = 0 + while cur < len(data): + try: + ((offset, ), cur) = relative_unpack('>q', data, cur) + (msg, cur) = read_int_string(data, cur) + for (offset, message) in KafkaProtocol._decode_message(msg, offset): + yield OffsetAndMessage(offset, message) + except BufferUnderflowError: # If we get a partial read of a message, stop + raise StopIteration() @classmethod - def encode_produce_request(cls, produceRequest): - """ - Encode a ProduceRequest - - Wire Format - =========== - ::= - ::= 0 - ::= - ::= - ::= - ::= - - The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet - """ - (topic, partition, messages) = produceRequest - message_set = cls.encode_message_set(messages) - log.debug("Sending MessageSet: %r" % message_set) - req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), - KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) - return req + def _decode_message(cls, data, offset): + """ + Decode a single Message + + The only caller of this method is decode_message_set_iter. They are decoupled to + support nested messages (compressed MessageSets). The offset is actually read from + decode_message_set_iter (it is part of the MessageSet payload). + """ + ((crc, magic, att), cur) = relative_unpack('>iBB', data, 0) + if crc != zlib.crc32(data[4:]): + raise ChecksumError("Message checksum failed") + + (key, cur) = read_int_string(data, cur) + (value, cur) = read_int_string(data, cur) + if att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_NONE: + yield (offset, Message(magic, att, key, value)) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_GZIP: + gz = gzip_decode(value) + for (offset, message) in KafkaProtocol._decode_message_set_iter(gz): + yield (offset, message) + elif att & KafkaProtocol.ATTRIBUTE_CODEC_MASK == KafkaProtocol.CODEC_SNAPPY: + snp = snappy_decode(value) + for (offset, message) in KafkaProtocol._decode_message_set_iter(snp): + yield (offset, message) + + ################## + # Public API # + ################## @classmethod - def encode_multi_produce_request(cls, produceRequests): + def create_message(cls, payload, key=None): """ - Encode a MultiProducerRequest + Construct a Message Params ====== - produceRequest: list of ProduceRequest objects - - Returns - ======= - Encoded request - - Wire Format - =========== - ::= - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - - num is the number of ProduceRequests being encoded - """ - req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) - for (topic, partition, messages) in produceRequests: - message_set = cls.encode_message_set(messages) - req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), - len(topic), topic, partition, len(message_set), message_set) - return req - - @classmethod - def encode_fetch_request(cls, fetchRequest): - """ - Encode a FetchRequest message - - Wire Format - =========== - ::= - ::= 1 - ::= - ::= - ::= - ::= - ::= - - The request-key (1) is encoded as a short (int16). - """ - (topic, partition, offset, size) = fetchRequest - req = struct.pack('>HH%dsiqi' % len(topic), - KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) - return req + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + """ + return Message(0, 0, key, payload) @classmethod - def encode_multi_fetch_request(cls, fetchRequests): + def create_gzip_message(cls, payloads, key=None): """ - Encode the MultiFetchRequest message from a list of FetchRequest objects + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. Params ====== - fetchRequests: list of FetchRequest - - Returns - ======= - req: bytes, The message to send to Kafka - - Wire Format - =========== - ::= [ ] - ::= 2 - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - ::= - - The request-key (2) is encoded as a short (int16). - """ - req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) - for (topic, partition, offset, size) in fetchRequests: - req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) - return req + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + """ + message_set = KafkaProtocol._encode_message_set( + [KafkaProtocol.create_message(payload) for payload in payloads]) + gzipped = gzip_encode(message_set) + return Message(0, 0x00 | (KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP), key, gzipped) @classmethod - def encode_offset_request(cls, offsetRequest): + def create_snappy_message(cls, payloads, key=None): """ - Encode an OffsetRequest message + Construct a Snappy Message containing multiple Messages - Wire Format - =========== - ::=