summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPandllCom <lynnheavn@126.com>2019-09-30 22:24:29 +0800
committerDana Powers <dana.powers@gmail.com>2019-09-30 07:24:29 -0700
commit7a7a890d7f50327d17358559d769e26b5268167e (patch)
tree18a9ed2ed6b8f7384072dcf355a70c83e3f84cd2
parent298cb0dbef58f6bb267235911b6ca86039bf8cda (diff)
downloadkafka-python-7a7a890d7f50327d17358559d769e26b5268167e.tar.gz
Added a function to determine if bootstrap is successfully connected (#1876)
-rw-r--r--kafka/consumer/group.py6
-rw-r--r--kafka/producer/kafka.py21
2 files changed, 20 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 231fc8a..a55bec1 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -391,6 +391,12 @@ class KafkaConsumer(six.Iterator):
self._subscription.subscribe(topics=topics)
self._client.set_topics(topics)
+ def bootstrap_connected(self):
+ """Return True if the bootstrap is connected."""
+ if self._client._bootstrap_fails > 0:
+ return False
+ return True
+
def assign(self, partitions):
"""Manually assign a list of TopicPartitions to this consumer.
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index e6bd3b9..95e797a 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -23,7 +23,6 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
from kafka.structs import TopicPartition
-
log = logging.getLogger(__name__)
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger()
@@ -376,13 +375,13 @@ class KafkaProducer(object):
reporters = [reporter() for reporter in self.config['metric_reporters']]
self._metrics = Metrics(metric_config, reporters)
- client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
- wakeup_timeout_ms=self.config['max_block_ms'],
- **self.config)
+ self._client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
+ wakeup_timeout_ms=self.config['max_block_ms'],
+ **self.config)
# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
- self.config['api_version'] = client.config['api_version']
+ self.config['api_version'] = self._client.config['api_version']
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
@@ -398,9 +397,9 @@ class KafkaProducer(object):
message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
- self._metadata = client.cluster
+ self._metadata = self._client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
- self._sender = Sender(client, self._metadata,
+ self._sender = Sender(self._client, self._metadata,
self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
@@ -412,14 +411,22 @@ class KafkaProducer(object):
atexit.register(self._cleanup)
log.debug("Kafka producer started")
+ def bootstrap_connected(self):
+ """Return True if the bootstrap is connected."""
+ if self._client._bootstrap_fails > 0:
+ return False
+ return True
+
def _cleanup_factory(self):
"""Build a cleanup clojure that doesn't increase our ref count"""
_self = weakref.proxy(self)
+
def wrapper():
try:
_self.close(timeout=0)
except (ReferenceError, AttributeError):
pass
+
return wrapper
def _unregister_cleanup(self):