summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMostafa-Elmenbawy <elmenabawym@gmail.com>2020-09-07 23:23:11 +0000
committerGitHub <noreply@github.com>2020-09-07 16:23:11 -0700
commit3a9d8306137e6f1c7481a5ca2c4b27f62cbb5165 (patch)
treeebdb48538b28aa98a8b11c082e948e95fc76ee30
parentf6677cf616aace9b9d9ed2b764d3b52ace7d4230 (diff)
downloadkafka-python-3a9d8306137e6f1c7481a5ca2c4b27f62cbb5165.tar.gz
Update example.py (#2081)
Co-authored-by: MostafaElmenabawy <momenabawy@synapse-analytics.io>
-rwxr-xr-xexample.py46
1 files changed, 28 insertions, 18 deletions
diff --git a/example.py b/example.py
index dac97b7..9907450 100755
--- a/example.py
+++ b/example.py
@@ -1,15 +1,15 @@
#!/usr/bin/env python
-import threading, logging, time
-import multiprocessing
+import threading, time
-from kafka import KafkaConsumer, KafkaProducer
+from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
+from kafka.admin import NewTopic
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
-
+
def stop(self):
self.stop_event.set()
@@ -23,14 +23,15 @@ class Producer(threading.Thread):
producer.close()
-class Consumer(multiprocessing.Process):
+
+class Consumer(threading.Thread):
def __init__(self):
- multiprocessing.Process.__init__(self)
- self.stop_event = multiprocessing.Event()
-
+ threading.Thread.__init__(self)
+ self.stop_event = threading.Event()
+
def stop(self):
self.stop_event.set()
-
+
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
@@ -44,29 +45,38 @@ class Consumer(multiprocessing.Process):
break
consumer.close()
-
-
+
+
def main():
+ # Create 'my-topic' Kafka topic
+ try:
+ admin = KafkaAdminClient(bootstrap_servers='localhost:9092')
+
+ topic = NewTopic(name='my-topic',
+ num_partitions=1,
+ replication_factor=1)
+ admin.create_topics([topic])
+ except Exception:
+ pass
+
tasks = [
Producer(),
Consumer()
]
+ # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()
time.sleep(10)
-
+
+ # Stop threads
for task in tasks:
task.stop()
for task in tasks:
task.join()
-
-
+
+
if __name__ == "__main__":
- logging.basicConfig(
- format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
- level=logging.INFO
- )
main()