From 2ba22bf4cebf5e25351816b38cd3cb70e2ea4cb8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 5 Jun 2015 23:09:50 -0700 Subject: Dont stop async producer until all pending messages have been processed --- kafka/producer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a0bf47c..0fd742d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -73,7 +73,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, request_tries = {} client.reinit() - while not stop_event.is_set(): + while not (stop_event.is_set() and queue.empty() and not request_tries): timeout = batch_time count = batch_size send_at = time.time() + timeout -- cgit v1.2.1