From 7a7a890d7f50327d17358559d769e26b5268167e Mon Sep 17 00:00:00 2001 From: PandllCom Date: Mon, 30 Sep 2019 22:24:29 +0800 Subject: Added a function to determine if bootstrap is successfully connected (#1876) --- kafka/consumer/group.py | 6 ++++++ kafka/producer/kafka.py | 21 ++++++++++++++------- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 231fc8a..a55bec1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -391,6 +391,12 @@ class KafkaConsumer(six.Iterator): self._subscription.subscribe(topics=topics) self._client.set_topics(topics) + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index e6bd3b9..95e797a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -23,7 +23,6 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition - log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() @@ -376,13 +375,13 @@ class KafkaProducer(object): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], - **self.config) + self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: - self.config['api_version'] = client.config['api_version'] + self.config['api_version'] = self._client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' @@ -398,9 +397,9 @@ class KafkaProducer(object): message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) - self._metadata = client.cluster + self._metadata = self._client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(client, self._metadata, + self._sender = Sender(self._client, self._metadata, self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) @@ -412,14 +411,22 @@ class KafkaProducer(object): atexit.register(self._cleanup) log.debug("Kafka producer started") + def bootstrap_connected(self): + """Return True if the bootstrap is connected.""" + if self._client._bootstrap_fails > 0: + return False + return True + def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self) + def wrapper(): try: _self.close(timeout=0) except (ReferenceError, AttributeError): pass + return wrapper def _unregister_cleanup(self): -- cgit v1.2.1