summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/subscription_state.py7
1 files changed, 7 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index fac1a98..4366010 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -128,15 +128,22 @@ class SubscriptionState(object):
Raises:
IllegalStateErrror: if assign_from_user has been used already
+ TypeError: if a non-str topic is given
"""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ if isinstance(topics, str):
+ topics = [topics]
+
if self.subscription == set(topics):
log.warning("subscription unchanged by change_subscription(%s)",
topics)
return
+ if any(not isinstance(t, str) for t in topics):
+ raise TypeError('All topics must be strings')
+
log.info('Updating subscribed topics to: %s', topics)
self.subscription = set(topics)
self._group_subscription.update(topics)