diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-12-05 18:26:07 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-12-05 18:26:07 -0800 |
commit | 3b422544ac15917465c3a0e0096b17f16fa9d193 (patch) | |
tree | a6ab38737208ed5944c7839f106aadf03c13572c | |
parent | 0ab97a99088c2871c3ab30dbc0dbf13f5f414433 (diff) | |
parent | 5682ff63534bb2579a6835d40afcad170f6bdd7c (diff) | |
download | kafka-python-3b422544ac15917465c3a0e0096b17f16fa9d193.tar.gz |
Merge pull request #485 from dpkp/async_producer_stop
Producer.stop() should block until async thread completes
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/producer/base.py | 19 | ||||
-rw-r--r-- | test/test_producer_integration.py | 6 |
3 files changed, 15 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py index b3dcf51..9018bb4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -607,11 +607,7 @@ class KafkaClient(object): else: decoder = KafkaProtocol.decode_produce_response - try: - resps = self._send_broker_aware_request(payloads, encoder, decoder) - except Exception: - if fail_on_error: - raise + resps = self._send_broker_aware_request(payloads, encoder, decoder) return [resp if not callback else callback(resp) for resp in resps if resp is not None and diff --git a/kafka/producer/base.py b/kafka/producer/base.py index f2c7cfe..1ba4f5b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -415,17 +415,22 @@ class Producer(object): raise return resp - def stop(self, timeout=1): + def stop(self): """ - Stop the producer. Optionally wait for the specified timeout before - forcefully cleaning up. + Stop the producer (async mode). Blocks until async thread completes. """ + if not self.async: + log.warning("producer.stop() called, but producer is not async") + return + + if self.stopped: + log.warning("producer.stop() called, but producer is already stopped") + return + if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) - self.thread.join(timeout) - - if self.thread.is_alive(): - self.thread_stop_event.set() + self.thread_stop_event.set() + self.thread.join() if hasattr(self, '_cleanup_func'): # Remove cleanup handler now that we've stopped diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 46b6851..c99ed63 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) + # flush messages + producer.stop() self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): |