From 3a9d8306137e6f1c7481a5ca2c4b27f62cbb5165 Mon Sep 17 00:00:00 2001 From: Mostafa-Elmenbawy Date: Mon, 7 Sep 2020 23:23:11 +0000 Subject: Update example.py (#2081) Co-authored-by: MostafaElmenabawy --- example.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/example.py b/example.py index dac97b7..9907450 100755 --- a/example.py +++ b/example.py @@ -1,15 +1,15 @@ #!/usr/bin/env python -import threading, logging, time -import multiprocessing +import threading, time -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() - + def stop(self): self.stop_event.set() @@ -23,14 +23,15 @@ class Producer(threading.Thread): producer.close() -class Consumer(multiprocessing.Process): + +class Consumer(threading.Thread): def __init__(self): - multiprocessing.Process.__init__(self) - self.stop_event = multiprocessing.Event() - + threading.Thread.__init__(self) + self.stop_event = threading.Event() + def stop(self): self.stop_event.set() - + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', @@ -44,29 +45,38 @@ class Consumer(multiprocessing.Process): break consumer.close() - - + + def main(): + # Create 'my-topic' Kafka topic + try: + admin = KafkaAdminClient(bootstrap_servers='localhost:9092') + + topic = NewTopic(name='my-topic', + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + except Exception: + pass + tasks = [ Producer(), Consumer() ] + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic for t in tasks: t.start() time.sleep(10) - + + # Stop threads for task in tasks: task.stop() for task in tasks: task.join() - - + + if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) main() -- cgit v1.2.1