summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py43
1 files changed, 37 insertions, 6 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index c3b8f3c..694e115 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -34,6 +34,8 @@ class ClusterMetadata(object):
self._lock = threading.Lock()
self.need_all_topic_metadata = False
self.unauthorized_topics = set()
+ self.internal_topics = set()
+ self.controller = None
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -150,13 +152,23 @@ class ClusterMetadata(object):
self._future = Future()
return self._future
- def topics(self):
+ def topics(self, exclude_internal_topics=True):
"""Get set of known topics.
+ Arguments:
+ exclude_internal_topics (bool): Whether records from internal topics
+ (such as offsets) should be exposed to the consumer. If set to
+ True the only way to receive records from an internal topic is
+ subscribing to it. Default True
+
Returns:
set: {topic (str), ...}
"""
- return set(self._partitions.keys())
+ topics = set(self._partitions.keys())
+ if exclude_internal_topics:
+ return topics - self.internal_topics
+ else:
+ return topics
def failed_update(self, exception):
"""Update cluster state given a failed MetadataRequest."""
@@ -180,23 +192,41 @@ class ClusterMetadata(object):
# In the common case where we ask for a single topic and get back an
# error, we should fail the future
if len(metadata.topics) == 1 and metadata.topics[0][0] != 0:
- error_code, topic, _ = metadata.topics[0]
+ error_code, topic = metadata.topics[0][:2]
error = Errors.for_code(error_code)(topic)
return self.failed_update(error)
if not metadata.brokers:
log.warning("No broker metadata found in MetadataResponse")
- for node_id, host, port in metadata.brokers:
+ for broker in metadata.brokers:
+ if metadata.API_VERSION == 0:
+ node_id, host, port = broker
+ rack = None
+ else:
+ node_id, host, port, rack = broker
self._brokers.update({
- node_id: BrokerMetadata(node_id, host, port, None)
+ node_id: BrokerMetadata(node_id, host, port, rack)
})
+ if metadata.API_VERSION == 0:
+ self.controller = None
+ else:
+ self.controller = self._brokers.get(metadata.controller_id)
+
_new_partitions = {}
_new_broker_partitions = collections.defaultdict(set)
_new_unauthorized_topics = set()
+ _new_internal_topics = set()
- for error_code, topic, partitions in metadata.topics:
+ for topic_data in metadata.topics:
+ if metadata.API_VERSION == 0:
+ error_code, topic, partitions = topic_data
+ is_internal = False
+ else:
+ error_code, topic, is_internal, partitions = topic_data
+ if is_internal:
+ _new_internal_topics.add(topic)
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
_new_partitions[topic] = {}
@@ -226,6 +256,7 @@ class ClusterMetadata(object):
self._partitions = _new_partitions
self._broker_partitions = _new_broker_partitions
self.unauthorized_topics = _new_unauthorized_topics
+ self.internal_topics = _new_internal_topics
f = None
if self._future:
f = self._future