summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py6
-rw-r--r--kafka/cluster.py6
-rw-r--r--kafka/consumer/group.py21
3 files changed, 23 insertions, 10 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 04bdf36..029a419 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -514,8 +514,12 @@ class KafkaClient(object):
node_id = self.least_loaded_node()
+ topics = list(self._topics)
+ if self.cluster.need_all_topic_metadata:
+ topics = []
+
if self._can_send_request(node_id):
- request = MetadataRequest(list(self._topics))
+ request = MetadataRequest(topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 863b0c2..69cc02e 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -32,6 +32,7 @@ class ClusterMetadata(object):
self._need_update = False
self._future = None
self._listeners = set()
+ self.need_all_topic_metadata = False
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -86,11 +87,12 @@ class ClusterMetadata(object):
return max(ttl, next_retry, 0)
def request_update(self):
- """
- Flags metadata for update, return Future()
+ """Flags metadata for update, return Future()
Actual update must be handled separately. This method will only
change the reported ttl()
+
+ Returns: Future (value will be this cluster object after update)
"""
self._need_update = True
if not self._future or self._future.is_done:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 009c163..d77a27a 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -350,14 +350,21 @@ class KafkaConsumer(six.Iterator):
return committed
def topics(self):
- """Get all topic metadata topics the user is authorized to view.
-
- [Not Implemented Yet]
+ """Get all topics the user is authorized to view.
Returns:
- {topic: [partition_info]}
+ set: topics
"""
- raise NotImplementedError('TODO')
+ cluster = self._client.cluster
+ if self._client._metadata_refresh_in_progress and self._client._topics:
+ future = cluster.request_update()
+ self._client.poll(future=future)
+ stash = cluster.need_all_topic_metadata
+ cluster.need_all_topic_metadata = True
+ future = cluster.request_update()
+ self._client.poll(future=future)
+ cluster.need_all_topic_metadata = stash
+ return cluster.topics()
def partitions_for_topic(self, topic):
"""Get metadata about the partitions for a given topic.
@@ -596,7 +603,7 @@ class KafkaConsumer(six.Iterator):
listener=listener)
# regex will need all topic metadata
if pattern is not None:
- self._client.cluster.need_metadata_for_all = True
+ self._client.set_topics([])
log.debug("Subscribed to topic pattern: %s", topics)
else:
self._client.set_topics(self._subscription.group_subscription())
@@ -614,7 +621,7 @@ class KafkaConsumer(six.Iterator):
"""Unsubscribe from all topics and clear all assigned partitions."""
self._subscription.unsubscribe()
self._coordinator.close()
- self._client.cluster.need_metadata_for_all_topics = False
+ self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")
def _update_fetch_positions(self, partitions):