summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py31
1 files changed, 19 insertions, 12 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d77a27a..65bb670 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -594,20 +594,26 @@ class KafkaConsumer(six.Iterator):
any listener set in a previous call to subscribe. It is
guaranteed, however, that the partitions revoked/assigned
through this interface are from topics subscribed in this call.
+
+ Raises:
+ IllegalStateError: if called after previously calling assign()
+ AssertionError: if neither topics or pattern is provided
+ TypeError: if listener is not a ConsumerRebalanceListener
"""
- if not topics:
- self.unsubscribe()
+ # SubscriptionState handles error checking
+ self._subscription.subscribe(topics=topics,
+ pattern=pattern,
+ listener=listener)
+
+ # regex will need all topic metadata
+ if pattern is not None:
+ self._client.cluster.need_all_topic_metadata = True
+ self._client.set_topics([])
+ log.debug("Subscribed to topic pattern: %s", pattern)
else:
- self._subscription.subscribe(topics=topics,
- pattern=pattern,
- listener=listener)
- # regex will need all topic metadata
- if pattern is not None:
- self._client.set_topics([])
- log.debug("Subscribed to topic pattern: %s", topics)
- else:
- self._client.set_topics(self._subscription.group_subscription())
- log.debug("Subscribed to topic(s): %s", topics)
+ self._client.cluster.need_all_topic_metadata = False
+ self._client.set_topics(self._subscription.group_subscription())
+ log.debug("Subscribed to topic(s): %s", topics)
def subscription(self):
"""Get the current topic subscription.
@@ -621,6 +627,7 @@ class KafkaConsumer(six.Iterator):
"""Unsubscribe from all topics and clear all assigned partitions."""
self._subscription.unsubscribe()
self._coordinator.close()
+ self._client.cluster.need_all_topic_metadata = False
self._client.set_topics([])
log.debug("Unsubscribed all topics or patterns and assigned partitions")