From d2012e067c953c80406c94f98d7a69d56a543f6c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 23 Jan 2016 15:06:17 -0800 Subject: KafkaClient.add_topic() -- for use by async producer --- kafka/client_async.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index f4566c0..0e2636e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -483,6 +483,21 @@ class KafkaClient(object): self._topics = set(topics) return future + def add_topic(self, topic): + """Add a topic to the list of topics tracked via metadata. + + Arguments: + topic (str): topic to track + + Returns: + Future: resolves after metadata request/response + """ + if topic in self._topics: + return Future().success(set(self._topics)) + + self._topics.add(topic) + return self.cluster.request_update() + # request metadata update on disconnect and timedout def _maybe_refresh_metadata(self): """Send a metadata request if needed. -- cgit v1.2.1