summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-03-21 23:15:28 -0700
committerDana Powers <dana.powers@gmail.com>2014-03-21 23:15:28 -0700
commit9bed11db98387c0d9e456528130b330631dc50af (patch)
tree6c054286a388df1647e3f81a4a85130742ba83ca
parente937e3f971f5958c8da6249b08288aafd5ed5bcd (diff)
parenta6fc260f288ac639070783a0f6faa94bd7612c67 (diff)
downloadkafka-python-9bed11db98387c0d9e456528130b330631dc50af.tar.gz
Merge pull request #134 from wizzat/conn_refactorv0.9.0
conn.py performance improvements, make examples work, add another example
-rwxr-xr-x[-rw-r--r--]example.py51
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/common.py36
-rw-r--r--kafka/conn.py19
-rwxr-xr-xload_example.py60
5 files changed, 133 insertions, 39 deletions
diff --git a/example.py b/example.py
index 0cf5583..062761b 100644..100755
--- a/example.py
+++ b/example.py
@@ -1,23 +1,48 @@
-import logging
+#!/usr/bin/env python
+import threading, logging, time
-from kafka.client import KafkaClient, FetchRequest, ProduceRequest
+from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
-def produce_example(client):
- producer = SimpleProducer(client, "my-topic")
- producer.send_messages("test")
+class Producer(threading.Thread):
+ daemon = True
-def consume_example(client):
- consumer = SimpleConsumer(client, "test-group", "my-topic")
- for message in consumer:
- print(message)
+ def run(self):
+ client = KafkaClient("localhost:9092")
+ producer = SimpleProducer(client)
+
+ while True:
+ producer.send_messages('my-topic', "test")
+ producer.send_messages('my-topic', "\xc2Hola, mundo!")
+
+ time.sleep(1)
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost:9092")
+ consumer = SimpleConsumer(client, "test-group", "my-topic")
+
+ for message in consumer:
+ print(message)
def main():
- client = KafkaClient("localhost:9092")
- produce_example(client)
- consume_example(client)
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(5)
if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(
+ format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+ level=logging.DEBUG
+ )
main()
diff --git a/kafka/client.py b/kafka/client.py
index ab0eb8d..39c89ba 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -5,7 +5,7 @@ from collections import defaultdict
from functools import partial
from itertools import count
-from kafka.common import (ErrorMapping, TopicAndPartition,
+from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
ConnectionError, FailedPayloadsError,
BrokerResponseError, PartitionUnavailableError,
LeaderUnavailableError,
@@ -199,8 +199,8 @@ class KafkaClient(object):
self.reset_topic_metadata(resp.topic)
raise BrokerResponseError(
- "Request for %s failed with errorcode=%d" %
- (TopicAndPartition(resp.topic, resp.partition), resp.error))
+ "Request for %s failed with errorcode=%d (%s)" %
+ (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
#################
# Public API #
diff --git a/kafka/common.py b/kafka/common.py
index b4fe5c7..005e6dd 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -48,22 +48,28 @@ Message = namedtuple("Message", ["magic", "attributes", "key", "value"])
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
+ErrorStrings = {
+ -1 : 'UNKNOWN',
+ 0 : 'NO_ERROR',
+ 1 : 'OFFSET_OUT_OF_RANGE',
+ 2 : 'INVALID_MESSAGE',
+ 3 : 'UNKNOWN_TOPIC_OR_PARTITON',
+ 4 : 'INVALID_FETCH_SIZE',
+ 5 : 'LEADER_NOT_AVAILABLE',
+ 6 : 'NOT_LEADER_FOR_PARTITION',
+ 7 : 'REQUEST_TIMED_OUT',
+ 8 : 'BROKER_NOT_AVAILABLE',
+ 9 : 'REPLICA_NOT_AVAILABLE',
+ 10 : 'MESSAGE_SIZE_TOO_LARGE',
+ 11 : 'STALE_CONTROLLER_EPOCH',
+ 12 : 'OFFSET_METADATA_TOO_LARGE',
+}
+
class ErrorMapping(object):
- # Many of these are not actually used by the client
- UNKNOWN = -1
- NO_ERROR = 0
- OFFSET_OUT_OF_RANGE = 1
- INVALID_MESSAGE = 2
- UNKNOWN_TOPIC_OR_PARTITON = 3
- INVALID_FETCH_SIZE = 4
- LEADER_NOT_AVAILABLE = 5
- NOT_LEADER_FOR_PARTITION = 6
- REQUEST_TIMED_OUT = 7
- BROKER_NOT_AVAILABLE = 8
- REPLICA_NOT_AVAILABLE = 9
- MESSAGE_SIZE_TO_LARGE = 10
- STALE_CONTROLLER_EPOCH = 11
- OFFSET_METADATA_TOO_LARGE = 12
+ pass
+
+for k, v in ErrorStrings.items():
+ setattr(ErrorMapping, v, k)
#################
# Exceptions #
diff --git a/kafka/conn.py b/kafka/conn.py
index 7538e8d..4fdeb17 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -54,11 +54,10 @@ class KafkaConnection(local):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
self.timeout = timeout
- self._sock.settimeout(self.timeout)
- self._dirty = False
+ self._sock = None
+
+ self.reinit()
def __repr__(self):
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -73,24 +72,28 @@ class KafkaConnection(local):
def _read_bytes(self, num_bytes):
bytes_left = num_bytes
- resp = ''
+ responses = []
+
log.debug("About to read %d bytes from Kafka", num_bytes)
if self._dirty:
self.reinit()
+
while bytes_left:
try:
- data = self._sock.recv(bytes_left)
+ data = self._sock.recv(min(bytes_left, 4096))
except socket.error:
log.exception('Unable to receive data from Kafka')
self._raise_connection_error()
+
if data == '':
log.error("Not enough data to read this response")
self._raise_connection_error()
+
bytes_left -= len(data)
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
- resp += data
+ responses.append(data)
- return resp
+ return ''.join(responses)
##################
# Public API #
diff --git a/load_example.py b/load_example.py
new file mode 100755
index 0000000..1f8b418
--- /dev/null
+++ b/load_example.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+import threading, logging, time, collections
+
+from kafka.client import KafkaClient
+from kafka.consumer import SimpleConsumer
+from kafka.producer import SimpleProducer
+
+msg_size = 524288
+
+class Producer(threading.Thread):
+ daemon = True
+ big_msg = "1" * msg_size
+
+ def run(self):
+ client = KafkaClient("localhost:9092")
+ producer = SimpleProducer(client)
+ self.sent = 0
+
+ while True:
+ producer.send_messages('my-topic', self.big_msg)
+ self.sent += 1
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost:9092")
+ consumer = SimpleConsumer(client, "test-group", "my-topic",
+ max_buffer_size = None,
+ )
+ self.valid = 0
+ self.invalid = 0
+
+ for message in consumer:
+ if len(message.message.value) == msg_size:
+ self.valid += 1
+ else:
+ self.invalid += 1
+
+def main():
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(10)
+ print 'Messages sent: %d' % threads[0].sent
+ print 'Messages recvd: %d' % threads[1].valid
+ print 'Messages invalid: %d' % threads[1].invalid
+
+if __name__ == "__main__":
+ logging.basicConfig(
+ format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
+ level=logging.DEBUG
+ )
+ main()