summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-30 07:49:59 -0700
committerDana Powers <dana.powers@gmail.com>2019-09-30 07:49:59 -0700
commit87fb1bb48f82bcaa6c5e1a1edadab2832659801c (patch)
tree86a2de52a2cfb85ecad0dcc1a9c3a8c49a5e0f26
parent7a7a890d7f50327d17358559d769e26b5268167e (diff)
downloadkafka-python-87fb1bb48f82bcaa6c5e1a1edadab2832659801c.tar.gz
Improve/refactor bootstrap_connected
-rw-r--r--kafka/client_async.py10
-rw-r--r--kafka/consumer/group.py4
-rw-r--r--kafka/producer/kafka.py19
-rw-r--r--kafka/producer/sender.py3
4 files changed, 22 insertions, 14 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b002797..3ec4ead 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -951,6 +951,16 @@ class KafkaClient(object):
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
self.close(node_id=conn_id)
+ def bootstrap_connected(self):
+ """Return True if a bootstrap node is connected"""
+ for node_id in self._conns:
+ if not self.cluster.is_bootstrap(node_id):
+ continue
+ if self._conns[node_id].connected():
+ return True
+ else:
+ return False
+
# OrderedDict requires python2.7+
try:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index a55bec1..15c2905 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -393,9 +393,7 @@ class KafkaConsumer(six.Iterator):
def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
- if self._client._bootstrap_fails > 0:
- return False
- return True
+ return self._client.bootstrap_connected()
def assign(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 95e797a..3ff1a09 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -23,6 +23,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
+
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
@@ -375,13 +376,13 @@ class KafkaProducer(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
- wakeup_timeout_ms=self.config['max_block_ms'],
- **self.config)
+ client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
+ wakeup_timeout_ms=self.config['max_block_ms'],
+ **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
- self.config['api_version'] = self._client.config['api_version']
+ self.config['api_version'] = client.config['api_version']
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
@@ -397,9 +398,9 @@ class KafkaProducer(object):
message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
- self._metadata = self._client.cluster
+ self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(self._client, self._metadata,
+ self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
@@ -413,20 +414,16 @@ class KafkaProducer(object):
def bootstrap_connected(self):
"""Return True if the bootstrap is connected."""
- if self._client._bootstrap_fails > 0:
- return False
- return True
+ return self._sender.bootstrap_connected()
def _cleanup_factory(self):
"""Build a cleanup clojure that doesn't increase our ref count"""
_self = weakref.proxy(self)
-
def wrapper():
try:
_self.close(timeout=0)
except (ReferenceError, AttributeError):
pass
-
return wrapper
def _unregister_cleanup(self):
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 88ec07c..705b58f 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -315,6 +315,9 @@ class Sender(threading.Thread):
"""Wake up the selector associated with this send thread."""
self._client.wakeup()
+ def bootstrap_connected(self):
+ return self._client.bootstrap_connected()
+
class SenderMetrics(object):