summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-12-05 18:26:07 -0800
committerDana Powers <dana.powers@gmail.com>2015-12-05 18:26:07 -0800
commit3b422544ac15917465c3a0e0096b17f16fa9d193 (patch)
treea6ab38737208ed5944c7839f106aadf03c13572c
parent0ab97a99088c2871c3ab30dbc0dbf13f5f414433 (diff)
parent5682ff63534bb2579a6835d40afcad170f6bdd7c (diff)
downloadkafka-python-3b422544ac15917465c3a0e0096b17f16fa9d193.tar.gz
Merge pull request #485 from dpkp/async_producer_stop
Producer.stop() should block until async thread completes
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/producer/base.py19
-rw-r--r--test/test_producer_integration.py6
3 files changed, 15 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py
index b3dcf51..9018bb4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -607,11 +607,7 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
- try:
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
- except Exception:
- if fail_on_error:
- raise
+ resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index f2c7cfe..1ba4f5b 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -415,17 +415,22 @@ class Producer(object):
raise
return resp
- def stop(self, timeout=1):
+ def stop(self):
"""
- Stop the producer. Optionally wait for the specified timeout before
- forcefully cleaning up.
+ Stop the producer (async mode). Blocks until async thread completes.
"""
+ if not self.async:
+ log.warning("producer.stop() called, but producer is not async")
+ return
+
+ if self.stopped:
+ log.warning("producer.stop() called, but producer is already stopped")
+ return
+
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.thread.join(timeout)
-
- if self.thread.is_alive():
- self.thread_stop_event.set()
+ self.thread_stop_event.set()
+ self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 46b6851..c99ed63 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
resp = producer.send_messages(self.topic, self.msg("one"))
self.assertEqual(len(resp), 0)
- # wait for the server to report a new highwatermark
- while self.current_offset(self.topic, partition) == start_offset:
- time.sleep(0.1)
+ # flush messages
+ producer.stop()
self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- producer.stop()
@kafka_versions("all")
def test_batched_simple_producer__triggers_by_message(self):