summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-16 12:07:03 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-16 20:25:19 -0700
commit277f0ddd61c230181f5f21d427070ec44b36a257 (patch)
tree2bff2baad9cd255cefff738d602d487e1c33443b
parent9960f3d8d2902ae0bb57262a6e530ed219168b2c (diff)
downloadkafka-python-metadata_v1.tar.gz
Use MetadataRequest v1 for 0.10+ api_versionmetadata_v1
-rw-r--r--kafka/client_async.py17
-rw-r--r--kafka/cluster.py43
2 files changed, 51 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 6fa9434..e064d51 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -178,7 +178,11 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- metadata_request = MetadataRequest[0]([])
+ if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
+ metadata_request = MetadataRequest[0]([])
+ else:
+ metadata_request = MetadataRequest[1](None)
+
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
cb = functools.partial(self._conn_state_change, 'bootstrap')
@@ -643,10 +647,17 @@ class KafkaClient(object):
topics = list(self._topics)
if self.cluster.need_all_topic_metadata:
- topics = []
+ if self.config['api_version'] < (0, 10):
+ topics = []
+ else:
+ topics = None
if self._can_send_request(node_id):
- request = MetadataRequest[0](topics)
+ if self.config['api_version'] < (0, 10):
+ api_version = 0
+ else:
+ api_version = 1
+ request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
diff --git a/kafka/cluster.py b/kafka/cluster.py
index c3b8f3c..694e115 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -34,6 +34,8 @@ class ClusterMetadata(object):
self._lock = threading.Lock()
self.need_all_topic_metadata = False
self.unauthorized_topics = set()
+ self.internal_topics = set()
+ self.controller = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -150,13 +152,23 @@ class ClusterMetadata(object):
self._future = Future()
return self._future
- def topics(self):
+ def topics(self, exclude_internal_topics=True):
"""Get set of known topics.
+ Arguments:
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Default True
+
Returns:
set: {topic (str), ...}
"""
- return set(self._partitions.keys())
+ topics = set(self._partitions.keys())
+ if exclude_internal_topics:
+ return topics - self.internal_topics
+ else:
+ return topics
def failed_update(self, exception):
"""Update cluster state given a failed MetadataRequest."""
@@ -180,23 +192,41 @@ class ClusterMetadata(object):
# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
- error_code, topic, _ = metadata.topics[0]
+ error_code, topic = metadata.topics[0][:2]
error = Errors.for_code(error_code)(topic)
return self.failed_update(error)
if not metadata.brokers:
log.warning("No broker metadata found in MetadataResponse")
- for node_id, host, port in metadata.brokers:
+ for broker in metadata.brokers:
+ if metadata.API_VERSION == 0:
+ node_id, host, port = broker
+ rack = None
+ else:
+ node_id, host, port, rack = broker
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port, None)
+ node_id: BrokerMetadata(node_id, host, port, rack)
})
+ if metadata.API_VERSION == 0:
+ self.controller = None
+ else:
+ self.controller = self._brokers.get(metadata.controller_id)
+
_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
_new_unauthorized_topics = set()
+ _new_internal_topics = set()
- for error_code, topic, partitions in metadata.topics:
+ for topic_data in metadata.topics:
+ if metadata.API_VERSION == 0:
+ error_code, topic, partitions = topic_data
+ is_internal = False
+ else:
+ error_code, topic, is_internal, partitions = topic_data
+ if is_internal:
+ _new_internal_topics.add(topic)
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
@@ -226,6 +256,7 @@ class ClusterMetadata(object):
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics
+ self.internal_topics = _new_internal_topics
f = None
if self._future:
f = self._future