summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNing Xie <andy.xning@gmail.com>2018-08-31 21:01:46 +0800
committerDana Powers <dana.powers@gmail.com>2018-08-31 06:01:46 -0700
commita7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9 (patch)
tree41acc8d3722d9e15c74b1ae8a7264250394bf9bc
parent9ac3cb1ec220ff9968a8b003b02e98dd11cc486b (diff)
downloadkafka-python-a7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9.tar.gz
add support for smaller topic metadata fetch during bootstrap (#1541)
-rw-r--r--kafka/client_async.py13
-rw-r--r--kafka/conn.py6
-rw-r--r--kafka/producer/kafka.py1
3 files changed, 14 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a16f6b..c0072ae 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -149,6 +149,7 @@ class KafkaClient(object):
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
+ 'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
'connections_max_idle_ms': 9 * 60 * 1000,
@@ -236,9 +237,15 @@ class KafkaClient(object):
self._last_bootstrap = time.time()
if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
- metadata_request = MetadataRequest[0]([])
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[0]([])
else:
- metadata_request = MetadataRequest[1](None)
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[1](None)
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
@@ -830,7 +837,7 @@ class KafkaClient(object):
self._refresh_on_disconnects = False
try:
remaining = end - time.time()
- version = conn.check_version(timeout=remaining, strict=strict)
+ version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request
diff --git a/kafka/conn.py b/kafka/conn.py
index a2d5ee6..122297b 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -892,7 +892,7 @@ class BrokerConnection(object):
# so if all else fails, choose that
return (0, 10, 0)
- def check_version(self, timeout=2, strict=False):
+ def check_version(self, timeout=2, strict=False, topics=[]):
"""Attempt to guess the broker version.
Note: This is a blocking call.
@@ -925,7 +925,7 @@ class BrokerConnection(object):
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
- ((0, 8, 0), MetadataRequest[0]([])),
+ ((0, 8, 0), MetadataRequest[0](topics)),
]
for version, request in test_cases:
@@ -941,7 +941,7 @@ class BrokerConnection(object):
# the attempt to write to a disconnected socket should
# immediately fail and allow us to infer that the prior
# request was unrecognized
- mr = self.send(MetadataRequest[0]([]))
+ mr = self.send(MetadataRequest[0](topics))
selector = self.config['selector']()
selector.register(self._sock, selectors.EVENT_READ)
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 719acef..d8fb5dc 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -281,6 +281,7 @@ class KafkaProducer(object):
'key_serializer': None,
'value_serializer': None,
'acks': 1,
+ 'bootstrap_topics_filter': set(),
'compression_type': None,
'retries': 0,
'batch_size': 16384,