diff options
author | Ning Xie <andy.xning@gmail.com> | 2018-08-31 21:01:46 +0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-08-31 06:01:46 -0700 |
commit | a7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9 (patch) | |
tree | 41acc8d3722d9e15c74b1ae8a7264250394bf9bc | |
parent | 9ac3cb1ec220ff9968a8b003b02e98dd11cc486b (diff) | |
download | kafka-python-a7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9.tar.gz |
add support for smaller topic metadata fetch during bootstrap (#1541)
-rw-r--r-- | kafka/client_async.py | 13 | ||||
-rw-r--r-- | kafka/conn.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 1 |
3 files changed, 14 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 5a16f6b..c0072ae 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -149,6 +149,7 @@ class KafkaClient(object): DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', + 'bootstrap_topics_filter': set(), 'client_id': 'kafka-python-' + __version__, 'request_timeout_ms': 30000, 'connections_max_idle_ms': 9 * 60 * 1000, @@ -236,9 +237,15 @@ class KafkaClient(object): self._last_bootstrap = time.time() if self.config['api_version'] is None or self.config['api_version'] < (0, 10): - metadata_request = MetadataRequest[0]([]) + if self.config['bootstrap_topics_filter']: + metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter'])) + else: + metadata_request = MetadataRequest[0]([]) else: - metadata_request = MetadataRequest[1](None) + if self.config['bootstrap_topics_filter']: + metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter'])) + else: + metadata_request = MetadataRequest[1](None) for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) @@ -830,7 +837,7 @@ class KafkaClient(object): self._refresh_on_disconnects = False try: remaining = end - time.time() - version = conn.check_version(timeout=remaining, strict=strict) + version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) return version except Errors.NodeNotReadyError: # Only raise to user if this is a node-specific request diff --git a/kafka/conn.py b/kafka/conn.py index a2d5ee6..122297b 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -892,7 +892,7 @@ class BrokerConnection(object): # so if all else fails, choose that return (0, 10, 0) - def check_version(self, timeout=2, strict=False): + def check_version(self, timeout=2, strict=False, topics=[]): """Attempt to guess the broker version. Note: This is a blocking call. @@ -925,7 +925,7 @@ class BrokerConnection(object): ((0, 9), ListGroupsRequest[0]()), ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), - ((0, 8, 0), MetadataRequest[0]([])), + ((0, 8, 0), MetadataRequest[0](topics)), ] for version, request in test_cases: @@ -941,7 +941,7 @@ class BrokerConnection(object): # the attempt to write to a disconnected socket should # immediately fail and allow us to infer that the prior # request was unrecognized - mr = self.send(MetadataRequest[0]([])) + mr = self.send(MetadataRequest[0](topics)) selector = self.config['selector']() selector.register(self._sock, selectors.EVENT_READ) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 719acef..d8fb5dc 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,7 @@ class KafkaProducer(object): 'key_serializer': None, 'value_serializer': None, 'acks': 1, + 'bootstrap_topics_filter': set(), 'compression_type': None, 'retries': 0, 'batch_size': 16384, |