From a7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9 Mon Sep 17 00:00:00 2001 From: Ning Xie Date: Fri, 31 Aug 2018 21:01:46 +0800 Subject: add support for smaller topic metadata fetch during bootstrap (#1541) --- kafka/client_async.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 5a16f6b..c0072ae 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -149,6 +149,7 @@ class KafkaClient(object): DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', + 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, 'connections_max_idle_ms': 9 * 60 * 1000, @@ -236,9 +237,15 @@ class KafkaClient(object): self._last_bootstrap = time.time() if self.config['api_version'] is None or self.config['api_version'] < (0, 10): - metadata_request = MetadataRequest[0]([]) + if self.config['bootstrap_topics_filter']: + metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter'])) + else: + metadata_request = MetadataRequest[0]([]) else: - metadata_request = MetadataRequest[1](None) + if self.config['bootstrap_topics_filter']: + metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter'])) + else: + metadata_request = MetadataRequest[1](None) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) @@ -830,7 +837,7 @@ class KafkaClient(object): self._refresh_on_disconnects = False try: remaining = end - time.time() - version = conn.check_version(timeout=remaining, strict=strict) + version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) return version except Errors.NodeNotReadyError: # Only raise to user if this is a node-specific request -- cgit v1.2.1