From ee7e86ea712de0a0390e64752c5cf9180c1681b5 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 01:02:39 -0800 Subject: Update example.py to compile, add friendly load_example.py --- example.py | 48 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 13 deletions(-) mode change 100644 => 100755 example.py (limited to 'example.py') diff --git a/example.py b/example.py old mode 100644 new mode 100755 index 3a2dc92..0eac0a5 --- a/example.py +++ b/example.py @@ -1,23 +1,45 @@ -import logging +#!/usr/bin/env python +import threading, logging, time -from kafka.client import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer -def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") +class Producer(threading.Thread): + daemon = True -def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") - for message in consumer: - print(message) + def run(self): + client = KafkaClient("localhost", 9092) + producer = SimpleProducer(client) + + while True: + producer.send_messages('my-topic', "test") + producer.send_messages('my-topic', "\xc2Hola, mundo!") + + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost", 9092) + consumer = SimpleConsumer(client, "test-group", "my-topic") + + for message in consumer: + print(message) def main(): - client = KafkaClient("localhost", 9092) - produce_example(client) - consume_example(client) + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() -- cgit v1.2.1