From bea1a2adacc662abe2b041bc38bfc452bb12caab Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 May 2016 12:40:15 -0700 Subject: Move load_example.py to benchmarks/ --- benchmarks/load_example.py | 65 ++++++++++++++++++++++++++++++++++++++++++++++ load_example.py | 65 ---------------------------------------------- 2 files changed, 65 insertions(+), 65 deletions(-) create mode 100755 benchmarks/load_example.py delete mode 100755 load_example.py diff --git a/benchmarks/load_example.py b/benchmarks/load_example.py new file mode 100755 index 0000000..a3b09ba --- /dev/null +++ b/benchmarks/load_example.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +import threading, logging, time + +from kafka import KafkaConsumer, KafkaProducer + +msg_size = 524288 + +producer_stop = threading.Event() +consumer_stop = threading.Event() + +class Producer(threading.Thread): + big_msg = b'1' * msg_size + + def run(self): + producer = KafkaProducer(bootstrap_servers='localhost:9092') + self.sent = 0 + + while not producer_stop.is_set(): + producer.send('my-topic', self.big_msg) + self.sent += 1 + producer.flush() + + +class Consumer(threading.Thread): + + def run(self): + consumer = KafkaConsumer(bootstrap_servers='localhost:9092', + auto_offset_reset='earliest') + consumer.subscribe(['my-topic']) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.value) == msg_size: + self.valid += 1 + else: + self.invalid += 1 + + if consumer_stop.is_set(): + break + + consumer.close() + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + producer_stop.set() + consumer_stop.set() + print 'Messages sent: %d' % threads[0].sent + print 'Messages recvd: %d' % threads[1].valid + print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.INFO + ) + main() diff --git a/load_example.py b/load_example.py deleted file mode 100755 index a3b09ba..0000000 --- a/load_example.py +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env python -import threading, logging, time - -from kafka import KafkaConsumer, KafkaProducer - -msg_size = 524288 - -producer_stop = threading.Event() -consumer_stop = threading.Event() - -class Producer(threading.Thread): - big_msg = b'1' * msg_size - - def run(self): - producer = KafkaProducer(bootstrap_servers='localhost:9092') - self.sent = 0 - - while not producer_stop.is_set(): - producer.send('my-topic', self.big_msg) - self.sent += 1 - producer.flush() - - -class Consumer(threading.Thread): - - def run(self): - consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') - consumer.subscribe(['my-topic']) - self.valid = 0 - self.invalid = 0 - - for message in consumer: - if len(message.value) == msg_size: - self.valid += 1 - else: - self.invalid += 1 - - if consumer_stop.is_set(): - break - - consumer.close() - -def main(): - threads = [ - Producer(), - Consumer() - ] - - for t in threads: - t.start() - - time.sleep(10) - producer_stop.set() - consumer_stop.set() - print 'Messages sent: %d' % threads[0].sent - print 'Messages recvd: %d' % threads[1].valid - print 'Messages invalid: %d' % threads[1].invalid - -if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) - main() -- cgit v1.2.1