diff options
author | David Arthur <mumrah@gmail.com> | 2012-11-20 10:16:15 -0500 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2012-11-20 10:16:15 -0500 |
commit | ddae03d95f9cbb3d460733f698794fa936a45119 (patch) | |
tree | 8626db81f6578569b585d6927a32a5068c2973cb | |
parent | 1d0bc784d41364d6178dce452f2ac787b52749a8 (diff) | |
download | kafka-python-ddae03d95f9cbb3d460733f698794fa936a45119.tar.gz |
Adding client_fetch_size to queue interface
Also more docs
-rw-r--r-- | README.md | 23 | ||||
-rw-r--r-- | kafka/queue.py | 37 |
2 files changed, 57 insertions, 3 deletions
@@ -172,3 +172,26 @@ req = ProduceRequest(topic, 1, messages) kafka.send_message_set(req) kafka.close() ``` + +## Use Kafka like a FIFO queue + +Simple API: `get`, `put`, `close`. + +```python +kafka = KafkaClient("localhost", 9092) +q = KafkaQueue(kafka, "my-topic", [0,1]) +q.put("first") +q.put("second") +q.get() # first +q.get() # second +q.close() +kafka.close() +``` + +Since the producer and consumers are backed by actual `multiprocessing.Queue`, you can +do blocking or non-blocking puts and gets. + +```python +q.put("first", block=False) +q.get(block=True, timeout=10) +``` diff --git a/kafka/queue.py b/kafka/queue.py index b86b1db..d4f5b6c 100644 --- a/kafka/queue.py +++ b/kafka/queue.py @@ -9,12 +9,13 @@ from .client import KafkaClient, FetchRequest, ProduceRequest log = logging.getLogger("kafka") class KafkaConsumerProcess(Process): - def __init__(self, client, topic, partition, out_queue, barrier, consumer_sleep=200): + def __init__(self, client, topic, partition, out_queue, barrier, consumer_fetch_size=1024, consumer_sleep=200): self.client = copy(client) self.topic = topic self.partition = partition self.out_queue = out_queue self.barrier = barrier + self.consumer_fetch_size = consumer_fetch_size self.consumer_sleep = consumer_sleep / 1000. log.info("Initializing %s" % self) Process.__init__(self) @@ -26,7 +27,7 @@ class KafkaConsumerProcess(Process): def run(self): self.barrier.wait() log.info("Starting %s" % self) - fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize) + fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.consumer_fetch_size) while True: if self.barrier.is_set() == False: log.info("Shutdown %s" % self) @@ -91,6 +92,10 @@ class KafkaQueue(object): """ KafkaQueue a Queue-like object backed by a Kafka producer and some number of consumers + Messages are eagerly loaded by the consumer in batches of size consumer_fetch_size. + Messages are buffered in the producer thread until producer_flush_timeout or + producer_flush_buffer is reached. + Params ====== client: KafkaClient object @@ -101,6 +106,8 @@ class KafkaQueue(object): Consumer Config =============== + consumer_fetch_size: int, number of bytes to fetch in one call to Kafka. Default + is 1024 consumer_sleep: int, time in milliseconds a consumer should sleep when it reaches the end of a partition. Default is 200 @@ -133,12 +140,36 @@ class KafkaQueue(object): self.barrier.set() def get(self, block=True, timeout=None): + """ + Consume a message from Kafka + + Params + ====== + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + + Returns + ======= + msg: str, the payload from Kafka + """ return self.in_queue.get(block, timeout).payload def put(self, msg, block=True, timeout=None): - return self.out_queue.put(msg, block, timeout) + """ + Send a message to Kafka + + Params + ====== + msg: std, the message to send + block: boolean, default True + timeout: int, number of seconds to wait when blocking, default None + """ + self.out_queue.put(msg, block, timeout) def close(self): + """ + Close the internal queues and Kafka consumers/producer + """ self.in_queue.close() self.out_queue.close() self.barrier.clear() |