diff options
author | Mostafa-Elmenbawy <elmenabawym@gmail.com> | 2020-09-07 23:23:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-07 16:23:11 -0700 |
commit | 3a9d8306137e6f1c7481a5ca2c4b27f62cbb5165 (patch) | |
tree | ebdb48538b28aa98a8b11c082e948e95fc76ee30 | |
parent | f6677cf616aace9b9d9ed2b764d3b52ace7d4230 (diff) | |
download | kafka-python-3a9d8306137e6f1c7481a5ca2c4b27f62cbb5165.tar.gz |
Update example.py (#2081)
Co-authored-by: MostafaElmenabawy <momenabawy@synapse-analytics.io>
-rwxr-xr-x | example.py | 46 |
1 files changed, 28 insertions, 18 deletions
@@ -1,15 +1,15 @@ #!/usr/bin/env python -import threading, logging, time -import multiprocessing +import threading, time -from kafka import KafkaConsumer, KafkaProducer +from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer +from kafka.admin import NewTopic class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.stop_event = threading.Event() - + def stop(self): self.stop_event.set() @@ -23,14 +23,15 @@ class Producer(threading.Thread): producer.close() -class Consumer(multiprocessing.Process): + +class Consumer(threading.Thread): def __init__(self): - multiprocessing.Process.__init__(self) - self.stop_event = multiprocessing.Event() - + threading.Thread.__init__(self) + self.stop_event = threading.Event() + def stop(self): self.stop_event.set() - + def run(self): consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='earliest', @@ -44,29 +45,38 @@ class Consumer(multiprocessing.Process): break consumer.close() - - + + def main(): + # Create 'my-topic' Kafka topic + try: + admin = KafkaAdminClient(bootstrap_servers='localhost:9092') + + topic = NewTopic(name='my-topic', + num_partitions=1, + replication_factor=1) + admin.create_topics([topic]) + except Exception: + pass + tasks = [ Producer(), Consumer() ] + # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic for t in tasks: t.start() time.sleep(10) - + + # Stop threads for task in tasks: task.stop() for task in tasks: task.join() - - + + if __name__ == "__main__": - logging.basicConfig( - format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', - level=logging.INFO - ) main() |