diff options
author | Dana Powers <dana.powers@gmail.com> | 2014-03-21 23:15:28 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2014-03-21 23:15:28 -0700 |
commit | 9bed11db98387c0d9e456528130b330631dc50af (patch) | |
tree | 6c054286a388df1647e3f81a4a85130742ba83ca | |
parent | e937e3f971f5958c8da6249b08288aafd5ed5bcd (diff) | |
parent | a6fc260f288ac639070783a0f6faa94bd7612c67 (diff) | |
download | kafka-python-9bed11db98387c0d9e456528130b330631dc50af.tar.gz |
Merge pull request #134 from wizzat/conn_refactorv0.9.0
conn.py performance improvements, make examples work, add another example
-rwxr-xr-x[-rw-r--r--] | example.py | 51 | ||||
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/common.py | 36 | ||||
-rw-r--r-- | kafka/conn.py | 19 | ||||
-rwxr-xr-x | load_example.py | 60 |
5 files changed, 133 insertions, 39 deletions
diff --git a/example.py b/example.py index 0cf5583..062761b 100644..100755 --- a/example.py +++ b/example.py @@ -1,23 +1,48 @@ -import logging +#!/usr/bin/env python +import threading, logging, time -from kafka.client import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer -def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") +class Producer(threading.Thread): + daemon = True -def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") - for message in consumer: - print(message) + def run(self): + client = KafkaClient("localhost:9092") + producer = SimpleProducer(client) + + while True: + producer.send_messages('my-topic', "test") + producer.send_messages('my-topic', "\xc2Hola, mundo!") + + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost:9092") + consumer = SimpleConsumer(client, "test-group", "my-topic") + + for message in consumer: + print(message) def main(): - client = KafkaClient("localhost:9092") - produce_example(client) - consume_example(client) + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) main() diff --git a/kafka/client.py b/kafka/client.py index ab0eb8d..39c89ba 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,7 +5,7 @@ from collections import defaultdict from functools import partial from itertools import count -from kafka.common import (ErrorMapping, TopicAndPartition, +from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, LeaderUnavailableError, @@ -199,8 +199,8 @@ class KafkaClient(object): self.reset_topic_metadata(resp.topic) raise BrokerResponseError( - "Request for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + "Request for %s failed with errorcode=%d (%s)" % + (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) ################# # Public API # diff --git a/kafka/common.py b/kafka/common.py index b4fe5c7..005e6dd 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,22 +48,28 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"]) TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +ErrorStrings = { + -1 : 'UNKNOWN', + 0 : 'NO_ERROR', + 1 : 'OFFSET_OUT_OF_RANGE', + 2 : 'INVALID_MESSAGE', + 3 : 'UNKNOWN_TOPIC_OR_PARTITON', + 4 : 'INVALID_FETCH_SIZE', + 5 : 'LEADER_NOT_AVAILABLE', + 6 : 'NOT_LEADER_FOR_PARTITION', + 7 : 'REQUEST_TIMED_OUT', + 8 : 'BROKER_NOT_AVAILABLE', + 9 : 'REPLICA_NOT_AVAILABLE', + 10 : 'MESSAGE_SIZE_TOO_LARGE', + 11 : 'STALE_CONTROLLER_EPOCH', + 12 : 'OFFSET_METADATA_TOO_LARGE', +} + class ErrorMapping(object): - # Many of these are not actually used by the client - UNKNOWN = -1 - NO_ERROR = 0 - OFFSET_OUT_OF_RANGE = 1 - INVALID_MESSAGE = 2 - UNKNOWN_TOPIC_OR_PARTITON = 3 - INVALID_FETCH_SIZE = 4 - LEADER_NOT_AVAILABLE = 5 - NOT_LEADER_FOR_PARTITION = 6 - REQUEST_TIMED_OUT = 7 - BROKER_NOT_AVAILABLE = 8 - REPLICA_NOT_AVAILABLE = 9 - MESSAGE_SIZE_TO_LARGE = 10 - STALE_CONTROLLER_EPOCH = 11 - OFFSET_METADATA_TOO_LARGE = 12 + pass + +for k, v in ErrorStrings.items(): + setattr(ErrorMapping, v, k) ################# # Exceptions # diff --git a/kafka/conn.py b/kafka/conn.py index 7538e8d..4fdeb17 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -54,11 +54,10 @@ class KafkaConnection(local): super(KafkaConnection, self).__init__() self.host = host self.port = port - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) self.timeout = timeout - self._sock.settimeout(self.timeout) - self._dirty = False + self._sock = None + + self.reinit() def __repr__(self): return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) @@ -73,24 +72,28 @@ class KafkaConnection(local): def _read_bytes(self, num_bytes): bytes_left = num_bytes - resp = '' + responses = [] + log.debug("About to read %d bytes from Kafka", num_bytes) if self._dirty: self.reinit() + while bytes_left: try: - data = self._sock.recv(bytes_left) + data = self._sock.recv(min(bytes_left, 4096)) except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() + if data == '': log.error("Not enough data to read this response") self._raise_connection_error() + bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - resp += data + responses.append(data) - return resp + return ''.join(responses) ################## # Public API # diff --git a/load_example.py b/load_example.py new file mode 100755 index 0000000..1f8b418 --- /dev/null +++ b/load_example.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +import threading, logging, time, collections + +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +msg_size = 524288 + +class Producer(threading.Thread): + daemon = True + big_msg = "1" * msg_size + + def run(self): + client = KafkaClient("localhost:9092") + producer = SimpleProducer(client) + self.sent = 0 + + while True: + producer.send_messages('my-topic', self.big_msg) + self.sent += 1 + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost:9092") + consumer = SimpleConsumer(client, "test-group", "my-topic", + max_buffer_size = None, + ) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.message.value) == msg_size: + self.valid += 1 + else: + self.invalid += 1 + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + print 'Messages sent: %d' % threads[0].sent + print 'Messages recvd: %d' % threads[1].valid + print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) + main() |