summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenn Roth <TheAtomicOption@users.noreply.github.com>2017-10-21 14:21:55 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-21 14:21:55 -0700
commitfaf1749f3866a52b6d659a39dd04d0b635dd6a3d (patch)
treebf794aefc70d9b0960ae639ac5418e4e8b9b9bd5
parent146b893e0fbac21150f74a8ba2f17cc64e1714ad (diff)
downloadkafka-python-faf1749f3866a52b6d659a39dd04d0b635dd6a3d.tar.gz
Added controlled thread shutdown to example.py (#1268)
-rwxr-xr-xexample.py40
1 files changed, 32 insertions, 8 deletions
diff --git a/example.py b/example.py
index 2431ee2..dac97b7 100755
--- a/example.py
+++ b/example.py
@@ -6,29 +6,46 @@ from kafka import KafkaConsumer, KafkaProducer
class Producer(threading.Thread):
- daemon = True
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.stop_event = threading.Event()
+
+ def stop(self):
+ self.stop_event.set()
def run(self):
producer = KafkaProducer(bootstrap_servers='localhost:9092')
- while True:
+ while not self.stop_event.is_set():
producer.send('my-topic', b"test")
producer.send('my-topic', b"\xc2Hola, mundo!")
time.sleep(1)
+ producer.close()
class Consumer(multiprocessing.Process):
- daemon = True
-
+ def __init__(self):
+ multiprocessing.Process.__init__(self)
+ self.stop_event = multiprocessing.Event()
+
+ def stop(self):
+ self.stop_event.set()
+
def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
- auto_offset_reset='earliest')
+ auto_offset_reset='earliest',
+ consumer_timeout_ms=1000)
consumer.subscribe(['my-topic'])
- for message in consumer:
- print (message)
-
+ while not self.stop_event.is_set():
+ for message in consumer:
+ print(message)
+ if self.stop_event.is_set():
+ break
+ consumer.close()
+
+
def main():
tasks = [
Producer(),
@@ -39,7 +56,14 @@ def main():
t.start()
time.sleep(10)
+
+ for task in tasks:
+ task.stop()
+ for task in tasks:
+ task.join()
+
+
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',