summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-15 16:17:22 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-15 16:17:22 -0800
commitb4c36ee3d3a6296bb7039f1720d4f6f319a69d96 (patch)
tree33dbdbf383647a38e495c823e22bd47e37084102 /kafka/cluster.py
parent89b4c2ff37205f9b76be6d398d75ec5214919468 (diff)
downloadkafka-python-b4c36ee3d3a6296bb7039f1720d4f6f319a69d96.tar.gz
Track set of unauthorized_topics in ClusterMetadata
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py3
1 files changed, 3 insertions, 0 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index a0053d6..4e0b94e 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -32,6 +32,7 @@ class ClusterMetadata(object):
self._future = None
self._listeners = set()
self.need_all_topic_metadata = False
+ self.unauthorized_topics = set()
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -129,6 +130,7 @@ class ClusterMetadata(object):
# but retain LeaderNotAvailable because it means topic is initializing
self._partitions.clear()
self._broker_partitions.clear()
+ self.unauthorized_topics.clear()
for error_code, topic, partitions in metadata.topics:
error_type = Errors.for_code(error_code)
@@ -147,6 +149,7 @@ class ClusterMetadata(object):
log.error("Topic %s not found in cluster metadata", topic)
elif error_type is Errors.TopicAuthorizationFailedError:
log.error("Topic %s is not authorized for this client", topic)
+ self.unauthorized_topics.add(topic)
elif error_type is Errors.InvalidTopicError:
log.error("'%s' is not a valid topic name", topic)
else: