summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2012-11-20 10:16:15 -0500
committerDavid Arthur <mumrah@gmail.com>2012-11-20 10:16:15 -0500
commitddae03d95f9cbb3d460733f698794fa936a45119 (patch)
tree8626db81f6578569b585d6927a32a5068c2973cb
parent1d0bc784d41364d6178dce452f2ac787b52749a8 (diff)
downloadkafka-python-ddae03d95f9cbb3d460733f698794fa936a45119.tar.gz
Adding client_fetch_size to queue interface
Also more docs
-rw-r--r--README.md23
-rw-r--r--kafka/queue.py37
2 files changed, 57 insertions, 3 deletions
diff --git a/README.md b/README.md
index 01d8e0c..f421a84 100644
--- a/README.md
+++ b/README.md
@@ -172,3 +172,26 @@ req = ProduceRequest(topic, 1, messages)
kafka.send_message_set(req)
kafka.close()
```
+
+## Use Kafka like a FIFO queue
+
+Simple API: `get`, `put`, `close`.
+
+```python
+kafka = KafkaClient("localhost", 9092)
+q = KafkaQueue(kafka, "my-topic", [0,1])
+q.put("first")
+q.put("second")
+q.get() # first
+q.get() # second
+q.close()
+kafka.close()
+```
+
+Since the producer and consumers are backed by actual `multiprocessing.Queue`, you can
+do blocking or non-blocking puts and gets.
+
+```python
+q.put("first", block=False)
+q.get(block=True, timeout=10)
+```
diff --git a/kafka/queue.py b/kafka/queue.py
index b86b1db..d4f5b6c 100644
--- a/kafka/queue.py
+++ b/kafka/queue.py
@@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
class KafkaConsumerProcess(Process):
- def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200):
+ def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
+ self.consumer_fetch_size = consumer_fetch_size
self.consumer_sleep = consumer_sleep / 1000.
log.info("Initializing %s" % self)
Process.__init__(self)
@@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process):
def run(self):
self.barrier.wait()
log.info("Starting %s" % self)
- fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
+ fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size)
while True:
if self.barrier.is_set() == False:
log.info("Shutdown %s" % self)
@@ -91,6 +92,10 @@ class KafkaQueue(object):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers
+ Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size.
+ Messages are buffered in the producer thread until producer_flush_timeout or
+ producer_flush_buffer is reached.
+
Params
======
client: KafkaClient object
@@ -101,6 +106,8 @@ class KafkaQueue(object):
Consumer Config
===============
+ consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default
+ is 1024
consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches
the end of a partition. Default is 200
@@ -133,12 +140,36 @@ class KafkaQueue(object):
self.barrier.set()
def get(self, block=True, timeout=None):
+ """
+ Consume a message from Kafka
+
+ Params
+ ======
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
+
+ Returns
+ =======
+ msg: str, the payload from Kafka
+ """
return self.in_queue.get(block, timeout).payload
def put(self, msg, block=True, timeout=None):
- return self.out_queue.put(msg, block, timeout)
+ """
+ Send a message to Kafka
+
+ Params
+ ======
+ msg: std, the message to send
+ block: boolean, default True
+ timeout: int, number of seconds to wait when blocking, default None
+ """
+ self.out_queue.put(msg, block, timeout)
def close(self):
+ """
+ Close the internal queues and Kafka consumers/producer
+ """
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()