diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-18 22:46:39 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-18 22:46:39 -0800 |
commit | 6a2466b4d9616c328660301a875c4d199430b251 (patch) | |
tree | ac5a77fa45a078edfb820a4bb7773a2736ffe37c | |
parent | 99bc503b1a549bd0877706ec8f04f7cb35445cda (diff) | |
download | kafka-python-6a2466b4d9616c328660301a875c4d199430b251.tar.gz |
Migrate load_example.py to KafkaProducer / KafkaConsumer
-rwxr-xr-x | load_example.py | 39 |
1 files changed, 22 insertions, 17 deletions
diff --git a/load_example.py b/load_example.py index 1f8b418..a3b09ba 100755 --- a/load_example.py +++ b/load_example.py @@ -1,43 +1,46 @@ #!/usr/bin/env python -import threading, logging, time, collections +import threading, logging, time -from kafka.client import KafkaClient -from kafka.consumer import SimpleConsumer -from kafka.producer import SimpleProducer +from kafka import KafkaConsumer, KafkaProducer msg_size = 524288 +producer_stop = threading.Event() +consumer_stop = threading.Event() + class Producer(threading.Thread): - daemon = True - big_msg = "1" * msg_size + big_msg = b'1' * msg_size def run(self): - client = KafkaClient("localhost:9092") - producer = SimpleProducer(client) + producer = KafkaProducer(bootstrap_servers='localhost:9092') self.sent = 0 - while True: - producer.send_messages('my-topic', self.big_msg) + while not producer_stop.is_set(): + producer.send('my-topic', self.big_msg) self.sent += 1 + producer.flush() class Consumer(threading.Thread): - daemon = True def run(self): - client = KafkaClient("localhost:9092") - consumer = SimpleConsumer(client, "test-group", "my-topic", - max_buffer_size = None, - ) + consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + auto_offset_reset='earliest') + consumer.subscribe(['my-topic']) self.valid = 0 self.invalid = 0 for message in consumer: - if len(message.message.value) == msg_size: + if len(message.value) == msg_size: self.valid += 1 else: self.invalid += 1 + if consumer_stop.is_set(): + break + + consumer.close() + def main(): threads = [ Producer(), @@ -48,6 +51,8 @@ def main(): t.start() time.sleep(10) + producer_stop.set() + consumer_stop.set() print 'Messages sent: %d' % threads[0].sent print 'Messages recvd: %d' % threads[1].valid print 'Messages invalid: %d' % threads[1].invalid @@ -55,6 +60,6 @@ def main(): if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.DEBUG + level=logging.INFO ) main() |