diff options
| author | Niklas Mollenhauer <nikeee@users.noreply.github.com> | 2017-10-07 23:43:29 +0200 | 
|---|---|---|
| committer | Dana Powers <dana.powers@gmail.com> | 2017-10-07 14:43:29 -0700 | 
| commit | 30ba2c1dbd22eff5f202bbbf2ecd8b42d242b1b0 (patch) | |
| tree | 5d48f4cb00bb90c90e72871aa2ece81049070ba2 /kafka/consumer/subscription_state.py | |
| parent | f12ff950ad2131f1bd6f5fc6bf8afc6ecd5d6628 (diff) | |
| download | kafka-python-30ba2c1dbd22eff5f202bbbf2ecd8b42d242b1b0.tar.gz | |
Add method to ensure a valid topic name (#1238)
Diffstat (limited to 'kafka/consumer/subscription_state.py')
| -rw-r--r-- | kafka/consumer/subscription_state.py | 31 | 
1 files changed, 28 insertions, 3 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 19046ae..3d4dfef 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -43,6 +43,10 @@ class SubscriptionState(object):          " (2) subscribe to topics matching a regex pattern,"          " (3) assign itself specific topic-partitions.") +    # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 +    _MAX_NAME_LENGTH = 249 +    _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') +      def __init__(self, offset_reset_strategy='earliest'):          """Initialize a SubscriptionState instance @@ -120,6 +124,24 @@ class SubscriptionState(object):              raise TypeError('listener must be a ConsumerRebalanceListener')          self.listener = listener +    def _ensure_valid_topic_name(self, topic): +        """ Ensures that the topic name is valid according to the kafka source. """ + +        # See Kafka Source: +        # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +        if topic is None: +            raise TypeError('All topics must not be None') +        if not isinstance(topic, six.string_types): +            raise TypeError('All topics must be strings') +        if len(topic) == 0: +            raise ValueError('All topics must be non-empty strings') +        if topic == '.' or topic == '..': +            raise ValueError('Topic name cannot be "." or ".."') +        if len(topic) > self._MAX_NAME_LENGTH: +            raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) +        if not self._TOPIC_LEGAL_CHARS.match(topic): +            raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) +      def change_subscription(self, topics):          """Change the topic subscription. @@ -128,7 +150,10 @@ class SubscriptionState(object):          Raises:              IllegalStateErrror: if assign_from_user has been used already -            TypeError: if a non-str topic is given +            TypeError: if a topic is None or a non-str +            ValueError: if a topic is an empty string or +                        - a topic name is '.' or '..' or +                        - a topic name does not consist of ASCII-characters/'-'/'_'/'.'          """          if self._user_assignment:              raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -141,8 +166,8 @@ class SubscriptionState(object):                          topics)              return -        if any(not isinstance(t, six.string_types) for t in topics): -            raise TypeError('All topics must be strings') +        for t in topics: +            self._ensure_valid_topic_name(t)          log.info('Updating subscribed topics to: %s', topics)          self.subscription = set(topics)  | 
