From 87fb1bb48f82bcaa6c5e1a1edadab2832659801c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 30 Sep 2019 07:49:59 -0700 Subject: Improve/refactor bootstrap_connected --- kafka/client_async.py | 10 ++++++++++ kafka/consumer/group.py | 4 +--- kafka/producer/kafka.py | 19 ++++++++----------- kafka/producer/sender.py | 3 +++ 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index b002797..3ec4ead 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -951,6 +951,16 @@ class KafkaClient(object): log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) self.close(node_id=conn_id) + def bootstrap_connected(self): + """Return True if a bootstrap node is connected""" + for node_id in self._conns: + if not self.cluster.is_bootstrap(node_id): + continue + if self._conns[node_id].connected(): + return True + else: + return False + # OrderedDict requires python2.7+ try: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a55bec1..15c2905 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -393,9 +393,7 @@ class KafkaConsumer(six.Iterator): def bootstrap_connected(self): """Return True if the bootstrap is connected.""" - if self._client._bootstrap_fails > 0: - return False - return True + return self._client.bootstrap_connected() def assign(self, partitions): """Manually assign a list of TopicPartitions to this consumer. diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 95e797a..3ff1a09 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -23,6 +23,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition + log = logging.getLogger(__name__) PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger() @@ -375,13 +376,13 @@ class KafkaProducer(object): reporters = [reporter() for reporter in self.config['metric_reporters']] self._metrics = Metrics(metric_config, reporters) - self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', - wakeup_timeout_ms=self.config['max_block_ms'], - **self.config) + client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer', + wakeup_timeout_ms=self.config['max_block_ms'], + **self.config) # Get auto-discovered version from client if necessary if self.config['api_version'] is None: - self.config['api_version'] = self._client.config['api_version'] + self.config['api_version'] = client.config['api_version'] if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' @@ -397,9 +398,9 @@ class KafkaProducer(object): message_version = self._max_usable_produce_magic() self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config) - self._metadata = self._client.cluster + self._metadata = client.cluster guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1) - self._sender = Sender(self._client, self._metadata, + self._sender = Sender(client, self._metadata, self._accumulator, self._metrics, guarantee_message_order=guarantee_message_order, **self.config) @@ -413,20 +414,16 @@ class KafkaProducer(object): def bootstrap_connected(self): """Return True if the bootstrap is connected.""" - if self._client._bootstrap_fails > 0: - return False - return True + return self._sender.bootstrap_connected() def _cleanup_factory(self): """Build a cleanup clojure that doesn't increase our ref count""" _self = weakref.proxy(self) - def wrapper(): try: _self.close(timeout=0) except (ReferenceError, AttributeError): pass - return wrapper def _unregister_cleanup(self): diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 88ec07c..705b58f 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -315,6 +315,9 @@ class Sender(threading.Thread): """Wake up the selector associated with this send thread.""" self._client.wakeup() + def bootstrap_connected(self): + return self._client.bootstrap_connected() + class SenderMetrics(object): -- cgit v1.2.1