From faf1749f3866a52b6d659a39dd04d0b635dd6a3d Mon Sep 17 00:00:00 2001 From: Benn Roth Date: Sat, 21 Oct 2017 14:21:55 -0700 Subject: Added controlled thread shutdown to example.py (#1268) --- example.py | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/example.py b/example.py index 2431ee2..dac97b7 100755 --- a/example.py +++ b/example.py @@ -6,29 +6,46 @@ from kafka import KafkaConsumer, KafkaProducer class Producer(threading.Thread): - daemon = True + def __init__(self): + threading.Thread.__init__(self) + self.stop_event = threading.Event() + + def stop(self): + self.stop_event.set() def run(self): producer = KafkaProducer(bootstrap_servers='localhost:9092') - while True: + while not self.stop_event.is_set(): producer.send('my-topic', b"test") producer.send('my-topic', b"\xc2Hola, mundo!") time.sleep(1) + producer.close() class Consumer(multiprocessing.Process): - daemon = True - + def __init__(self): + multiprocessing.Process.__init__(self) + self.stop_event = multiprocessing.Event() + + def stop(self): + self.stop_event.set() + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') + auto_offset_reset='earliest', + consumer_timeout_ms=1000) consumer.subscribe(['my-topic']) - for message in consumer: - print (message) - + while not self.stop_event.is_set(): + for message in consumer: + print(message) + if self.stop_event.is_set(): + break + consumer.close() + + def main(): tasks = [ Producer(), @@ -39,7 +56,14 @@ def main(): t.start() time.sleep(10) + + for task in tasks: + task.stop() + for task in tasks: + task.join() + + if __name__ == "__main__": logging.basicConfig( format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', -- cgit v1.2.1