diff options
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r-- | kafka/consumer/subscription_state.py | 113 |
1 files changed, 91 insertions, 22 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 38d4571..fa36bc2 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -42,10 +42,10 @@ class SubscriptionState(object): def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance - offset_reset_strategy: 'earliest' or 'latest', otherwise - exception will be raised when fetching an offset - that is no longer available. - Defaults to earliest. + Keyword Arguments: + offset_reset_strategy: 'earliest' or 'latest', otherwise + exception will be raised when fetching an offset that is no + longer available. Default: 'earliest' """ try: offset_reset_strategy = getattr(OffsetResetStrategy, @@ -67,14 +67,39 @@ class SubscriptionState(object): self.needs_fetch_committed_offsets = True def subscribe(self, topics=(), pattern=None, listener=None): - """Subscribe to a list of topics, or a topic regex pattern + """Subscribe to a list of topics, or a topic regex pattern. - Partitions will be assigned via a group coordinator - (incompatible with assign_from_user) + Partitions will be dynamically assigned via a group coordinator. + Topic subscriptions are not incremental: this list will replace the + current assignment (if there is one). - Optionally include listener callback, which must be a - ConsumerRebalanceListener and will be called before and - after each rebalance operation. + This method is incompatible with assign_from_user() + + Arguments: + topics (list): List of topics for subscription. + pattern (str): Pattern to match available topics. You must provide + either topics or pattern, but not both. + listener (ConsumerRebalanceListener): Optionally include listener + callback, which will be called before and after each rebalance + operation. + + As part of group management, the consumer will keep track of the + list of consumers that belong to a particular group and will + trigger a rebalance operation if one of the following events + trigger: + + * Number of partitions change for any of the subscribed topics + * Topic is created or deleted + * An existing member of the consumer group dies + * A new member is added to the consumer group + + When any of these events are triggered, the provided listener + will be invoked first to indicate that the consumer's assignment + has been revoked, and then again when the new assignment has + been received. Note that this listener will immediately override + any listener set in a previous call to subscribe. It is + guaranteed, however, that the partitions revoked/assigned + through this interface are from topics subscribed in this call. """ if self._user_assignment or (topics and pattern): raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -93,6 +118,14 @@ class SubscriptionState(object): self.listener = listener def change_subscription(self, topics): + """Change the topic subscription. + + Arguments: + topics (list of str): topics for subscription + + Raises: + IllegalStateErrror: if assign_from_user has been used already + """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -117,7 +150,8 @@ class SubscriptionState(object): This is used by the group leader to ensure that it receives metadata updates for all topics that any member of the group is subscribed to. - @param topics list of topics to add to the group subscription + Arguments: + topics (list of str): topics to add to the group subscription """ if self._user_assignment: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -128,12 +162,22 @@ class SubscriptionState(object): self.needs_partition_assignment = True def assign_from_user(self, partitions): - """ - Change the assignment to the specified partitions provided by the user, - note this is different from assign_from_subscribed() - whose input partitions are provided from the subscribed topics. + """Manually assign a list of TopicPartitions to this consumer. + + This interface does not allow for incremental assignment and will + replace the previous assignment (if there was one). - @param partitions: list (or iterable) of TopicPartition() + Manual topic assignment through this method does not use the consumer's + group management functionality. As such, there will be no rebalance + operation triggered when group membership or cluster and topic metadata + change. Note that it is not possible to use both manual partition + assignment with assign() and group assignment with subscribe(). + + Arguments: + partitions (list of TopicPartition): assignment for this instance. + + Raises: + IllegalStateError: if consumer has already called subscribe() """ if self.subscription is not None: raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) @@ -175,6 +219,7 @@ class SubscriptionState(object): log.info("Updated partition assignment: %s", assignments) def unsubscribe(self): + """Clear all topic subscriptions and partition assignments""" self.subscription = None self._user_assignment.clear() self.assignment.clear() @@ -191,17 +236,32 @@ class SubscriptionState(object): that would require rebalancing (the leader fetches metadata for all topics in the group so that it can do partition assignment). - @return set of topics + Returns: + set: topics """ return self._group_subscription def seek(self, partition, offset): + """Manually specify the fetch offset for a TopicPartition. + + Overrides the fetch offsets that the consumer will use on the next + poll(). If this API is invoked for the same partition more than once, + the latest offset will be used on the next poll(). Note that you may + lose data if this API is arbitrarily used in the middle of consumption, + to reset the fetch offsets. + + Arguments: + partition (TopicPartition): partition for seek operation + offset (int): message offset in partition + """ self.assignment[partition].seek(offset) def assigned_partitions(self): + """Return set of TopicPartitions in current assignment.""" return set(self.assignment.keys()) def fetchable_partitions(self): + """Return set of TopicPartitions that should be Fetched.""" fetchable = set() for partition, state in six.iteritems(self.assignment): if state.is_fetchable(): @@ -209,6 +269,7 @@ class SubscriptionState(object): return fetchable def partitions_auto_assigned(self): + """Return True unless user supplied partitions manually.""" return self.subscription is not None def all_consumed_offsets(self): @@ -220,11 +281,18 @@ class SubscriptionState(object): return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): + """Mark partition for offset reset using specified or default strategy. + + Arguments: + partition (TopicPartition): partition to mark + offset_reset_strategy (OffsetResetStrategy, optional) + """ if offset_reset_strategy is None: offset_reset_strategy = self._default_offset_reset_strategy self.assignment[partition].await_reset(offset_reset_strategy) def has_default_offset_reset_policy(self): + """Return True if default offset reset policy is Earliest or Latest""" return self._default_offset_reset_strategy != OffsetResetStrategy.NONE def is_offset_reset_needed(self, partition): @@ -372,8 +440,9 @@ class ConsumerRebalanceListener(object): NOTE: This method is only called before rebalances. It is not called prior to KafkaConsumer.close() - @param partitions The list of partitions that were assigned to the - consumer on the last rebalance + Arguments: + revoked (list of TopicPartition): the partitions that were assigned + to the consumer on the last rebalance """ pass @@ -389,8 +458,8 @@ class ConsumerRebalanceListener(object): their on_partitions_revoked() callback before any instance executes its on_partitions_assigned() callback. - @param partitions The list of partitions that are now assigned to the - consumer (may include partitions previously assigned - to the consumer) + Arguments: + assigned (list of TopicPartition): the partitions assigned to the + consumer (may include partitions that were previously assigned) """ pass |