summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-31 23:54:34 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-01 00:10:08 -0800
commitd3d6ea939b85ca033293898e2c4c63eda2335aab (patch)
tree6e1ca915d3373a56d626f8a904417c5d45964754 /kafka/consumer/group.py
parent843b34732d3cc5593c9e03c5ea062d705086eb8c (diff)
downloadkafka-python-d3d6ea939b85ca033293898e2c4c63eda2335aab.tar.gz
Implement KafkaConsumer.topics()
- add ClusterMetadata.need_all_topic_metadata attribute - client requests metadata for all topics if attribute True
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py21
1 files changed, 14 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 009c163..d77a27a 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -350,14 +350,21 @@ class KafkaConsumer(six.Iterator):
return committed
def topics(self):
- """Get all topic metadata topics the user is authorized to view.
-
- [Not Implemented Yet]
+ """Get all topics the user is authorized to view.
Returns:
- {topic: [partition_info]}
+ set: topics
"""
- raise NotImplementedError('TODO')
+ cluster = self._client.cluster
+ if self._client._metadata_refresh_in_progress and self._client._topics:
+ future = cluster.request_update()
+ self._client.poll(future=future)
+ stash = cluster.need_all_topic_metadata
+ cluster.need_all_topic_metadata = True
+ future = cluster.request_update()
+ self._client.poll(future=future)
+ cluster.need_all_topic_metadata = stash
+ return cluster.topics()
def partitions_for_topic(self, topic):
"""Get metadata about the partitions for a given topic.
@@ -596,7 +603,7 @@ class KafkaConsumer(six.Iterator):
listener=listener)
# regex will need all topic metadata
if pattern is not None:
- self._client.cluster.need_metadata_for_all = True
+ self._client.set_topics([])
log.debug("Subscribed to topic pattern: %s", topics)
else:
self._client.set_topics(self._subscription.group_subscription())
@@ -614,7 +621,7 @@ class KafkaConsumer(six.Iterator):
"""Unsubscribe from all topics and clear all assigned partitions."""
self._subscription.unsubscribe()
self._coordinator.close()
- self._client.cluster.need_metadata_for_all_topics = False
+ self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
def _update_fetch_positions(self, partitions):