summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 15:06:17 -0800
committerDana Powers <dana.powers@rd.io>2016-01-23 15:06:17 -0800
commitd2012e067c953c80406c94f98d7a69d56a543f6c (patch)
tree64d74816ee5edfa1ba79d5a23f18a8b6ecfd7051 /kafka/client_async.py
parentb8c209714c3a2251c056ebeed0357055cc8e3b72 (diff)
downloadkafka-python-d2012e067c953c80406c94f98d7a69d56a543f6c.tar.gz
KafkaClient.add_topic() -- for use by async producer
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py15
1 files changed, 15 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index f4566c0..0e2636e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -483,6 +483,21 @@ class KafkaClient(object):
self._topics = set(topics)
return future
+ def add_topic(self, topic):
+ """Add a topic to the list of topics tracked via metadata.
+
+ Arguments:
+ topic (str): topic to track
+
+ Returns:
+ Future: resolves after metadata request/response
+ """
+ if topic in self._topics:
+ return Future().success(set(self._topics))
+
+ self._topics.add(topic)
+ return self.cluster.request_update()
+
# request metadata update on disconnect and timedout
def _maybe_refresh_metadata(self):
"""Send a metadata request if needed.