summaryrefslogtreecommitdiff
path: root/example.py
diff options
context:
space:
mode:
authorMark Roberts <markroberts@kixeye.com>2014-02-25 01:02:39 -0800
committerMark Roberts <markroberts@kixeye.com>2014-02-25 11:25:36 -0800
commitee7e86ea712de0a0390e64752c5cf9180c1681b5 (patch)
tree17e945cad27737d371712303a80ca4b2ab467b18 /example.py
parente5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff)
downloadkafka-python-ee7e86ea712de0a0390e64752c5cf9180c1681b5.tar.gz
Update example.py to compile, add friendly load_example.py
Diffstat (limited to 'example.py')
-rwxr-xr-x[-rw-r--r--]example.py48
1 files changed, 35 insertions, 13 deletions
diff --git a/example.py b/example.py
index 3a2dc92..0eac0a5 100644..100755
--- a/example.py
+++ b/example.py
@@ -1,23 +1,45 @@
-import logging
+#!/usr/bin/env python
+import threading, logging, time
-from kafka.client import KafkaClient, FetchRequest, ProduceRequest
+from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
-def produce_example(client):
- producer = SimpleProducer(client, "my-topic")
- producer.send_messages("test")
+class Producer(threading.Thread):
+ daemon = True
-def consume_example(client):
- consumer = SimpleConsumer(client, "test-group", "my-topic")
- for message in consumer:
- print(message)
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ producer = SimpleProducer(client)
+
+ while True:
+ producer.send_messages('my-topic', "test")
+ producer.send_messages('my-topic', "\xc2Hola, mundo!")
+
+ time.sleep(1)
+
+
+class Consumer(threading.Thread):
+ daemon = True
+
+ def run(self):
+ client = KafkaClient("localhost", 9092)
+ consumer = SimpleConsumer(client, "test-group", "my-topic")
+
+ for message in consumer:
+ print(message)
def main():
- client = KafkaClient("localhost", 9092)
- produce_example(client)
- consume_example(client)
+ threads = [
+ Producer(),
+ Consumer()
+ ]
+
+ for t in threads:
+ t.start()
+
+ time.sleep(5)
if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
+ logging.basicConfig(level=logging.WARN)
main()