summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-18 22:46:39 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-18 22:46:39 -0800
commit6a2466b4d9616c328660301a875c4d199430b251 (patch)
treeac5a77fa45a078edfb820a4bb7773a2736ffe37c
parent99bc503b1a549bd0877706ec8f04f7cb35445cda (diff)
downloadkafka-python-6a2466b4d9616c328660301a875c4d199430b251.tar.gz
Migrate load_example.py to KafkaProducer / KafkaConsumer
-rwxr-xr-xload_example.py39
1 files changed, 22 insertions, 17 deletions
diff --git a/load_example.py b/load_example.py
index 1f8b418..a3b09ba 100755
--- a/load_example.py
+++ b/load_example.py
@@ -1,43 +1,46 @@
#!/usr/bin/env python
-import threading, logging, time, collections
+import threading, logging, time
-from kafka.client import KafkaClient
-from kafka.consumer import SimpleConsumer
-from kafka.producer import SimpleProducer
+from kafka import KafkaConsumer, KafkaProducer
msg_size = 524288
+producer_stop = threading.Event()
+consumer_stop = threading.Event()
+
class Producer(threading.Thread):
- daemon = True
- big_msg = "1" * msg_size
+ big_msg = b'1' * msg_size
def run(self):
- client = KafkaClient("localhost:9092")
- producer = SimpleProducer(client)
+ producer = KafkaProducer(bootstrap_servers='localhost:9092')
self.sent = 0
- while True:
- producer.send_messages('my-topic', self.big_msg)
+ while not producer_stop.is_set():
+ producer.send('my-topic', self.big_msg)
self.sent += 1
+ producer.flush()
class Consumer(threading.Thread):
- daemon = True
def run(self):
- client = KafkaClient("localhost:9092")
- consumer = SimpleConsumer(client, "test-group", "my-topic",
- max_buffer_size = None,
- )
+ consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
+ auto_offset_reset='earliest')
+ consumer.subscribe(['my-topic'])
self.valid = 0
self.invalid = 0
for message in consumer:
- if len(message.message.value) == msg_size:
+ if len(message.value) == msg_size:
self.valid += 1
else:
self.invalid += 1
+ if consumer_stop.is_set():
+ break
+
+ consumer.close()
+
def main():
threads = [
Producer(),
@@ -48,6 +51,8 @@ def main():
t.start()
time.sleep(10)
+ producer_stop.set()
+ consumer_stop.set()
print 'Messages sent: %d' % threads[0].sent
print 'Messages recvd: %d' % threads[1].valid
print 'Messages invalid: %d' % threads[1].invalid
@@ -55,6 +60,6 @@ def main():
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.DEBUG
+ level=logging.INFO
)
main()