diff options
author | Benn Roth <TheAtomicOption@users.noreply.github.com> | 2017-10-21 14:21:55 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-10-21 14:21:55 -0700 |
commit | faf1749f3866a52b6d659a39dd04d0b635dd6a3d (patch) | |
tree | bf794aefc70d9b0960ae639ac5418e4e8b9b9bd5 | |
parent | 146b893e0fbac21150f74a8ba2f17cc64e1714ad (diff) | |
download | kafka-python-faf1749f3866a52b6d659a39dd04d0b635dd6a3d.tar.gz |
Added controlled thread shutdown to example.py (#1268)
-rwxr-xr-x | example.py | 40 |
1 files changed, 32 insertions, 8 deletions
@@ -6,29 +6,46 @@ from kafka import KafkaConsumer, KafkaProducer class Producer(threading.Thread): - daemon = True + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') - while True: + while not self.stop_event.is_set(): producer.send('my-topic', b"test") producer.send('my-topic', b"\xc2Hola, mundo!") time.sleep(1) + producer.close() class Consumer(multiprocessing.Process): - daemon = True - + def __init__(self): + multiprocessing.Process.__init__(self) + self.stop_event = multiprocessing.Event() + + def stop(self): + self.stop_event.set() + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') + auto_offset_reset='earliest', + consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) - for message in consumer: - print (message) - + while not self.stop_event.is_set(): + for message in consumer: + print(message) + if self.stop_event.is_set(): + break + consumer.close() + + def main(): tasks = [ Producer(), @@ -39,7 +56,14 @@ def main(): t.start() time.sleep(10) + + for task in tasks: + task.stop() + for task in tasks: + task.join() + + if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', |