summaryrefslogtreecommitdiff
path: root/kafka/consumer/subscription_state.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/subscription_state.py')
-rw-r--r--kafka/consumer/subscription_state.py113
1 files changed, 91 insertions, 22 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 38d4571..fa36bc2 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -42,10 +42,10 @@ class SubscriptionState(object):
def __init__(self, offset_reset_strategy='earliest'):
"""Initialize a SubscriptionState instance
- offset_reset_strategy: 'earliest' or 'latest', otherwise
- exception will be raised when fetching an offset
- that is no longer available.
- Defaults to earliest.
+ Keyword Arguments:
+ offset_reset_strategy: 'earliest' or 'latest', otherwise
+ exception will be raised when fetching an offset that is no
+ longer available. Default: 'earliest'
"""
try:
offset_reset_strategy = getattr(OffsetResetStrategy,
@@ -67,14 +67,39 @@ class SubscriptionState(object):
self.needs_fetch_committed_offsets = True
def subscribe(self, topics=(), pattern=None, listener=None):
- """Subscribe to a list of topics, or a topic regex pattern
+ """Subscribe to a list of topics, or a topic regex pattern.
- Partitions will be assigned via a group coordinator
- (incompatible with assign_from_user)
+ Partitions will be dynamically assigned via a group coordinator.
+ Topic subscriptions are not incremental: this list will replace the
+ current assignment (if there is one).
- Optionally include listener callback, which must be a
- ConsumerRebalanceListener and will be called before and
- after each rebalance operation.
+ This method is incompatible with assign_from_user()
+
+ Arguments:
+ topics (list): List of topics for subscription.
+ pattern (str): Pattern to match available topics. You must provide
+ either topics or pattern, but not both.
+ listener (ConsumerRebalanceListener): Optionally include listener
+ callback, which will be called before and after each rebalance
+ operation.
+
+ As part of group management, the consumer will keep track of the
+ list of consumers that belong to a particular group and will
+ trigger a rebalance operation if one of the following events
+ trigger:
+
+ * Number of partitions change for any of the subscribed topics
+ * Topic is created or deleted
+ * An existing member of the consumer group dies
+ * A new member is added to the consumer group
+
+ When any of these events are triggered, the provided listener
+ will be invoked first to indicate that the consumer's assignment
+ has been revoked, and then again when the new assignment has
+ been received. Note that this listener will immediately override
+ any listener set in a previous call to subscribe. It is
+ guaranteed, however, that the partitions revoked/assigned
+ through this interface are from topics subscribed in this call.
"""
if self._user_assignment or (topics and pattern):
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -93,6 +118,14 @@ class SubscriptionState(object):
self.listener = listener
def change_subscription(self, topics):
+ """Change the topic subscription.
+
+ Arguments:
+ topics (list of str): topics for subscription
+
+ Raises:
+ IllegalStateErrror: if assign_from_user has been used already
+ """
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -117,7 +150,8 @@ class SubscriptionState(object):
This is used by the group leader to ensure that it receives metadata
updates for all topics that any member of the group is subscribed to.
- @param topics list of topics to add to the group subscription
+ Arguments:
+ topics (list of str): topics to add to the group subscription
"""
if self._user_assignment:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -128,12 +162,22 @@ class SubscriptionState(object):
self.needs_partition_assignment = True
def assign_from_user(self, partitions):
- """
- Change the assignment to the specified partitions provided by the user,
- note this is different from assign_from_subscribed()
- whose input partitions are provided from the subscribed topics.
+ """Manually assign a list of TopicPartitions to this consumer.
+
+ This interface does not allow for incremental assignment and will
+ replace the previous assignment (if there was one).
- @param partitions: list (or iterable) of TopicPartition()
+ Manual topic assignment through this method does not use the consumer's
+ group management functionality. As such, there will be no rebalance
+ operation triggered when group membership or cluster and topic metadata
+ change. Note that it is not possible to use both manual partition
+ assignment with assign() and group assignment with subscribe().
+
+ Arguments:
+ partitions (list of TopicPartition): assignment for this instance.
+
+ Raises:
+ IllegalStateError: if consumer has already called subscribe()
"""
if self.subscription is not None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
@@ -175,6 +219,7 @@ class SubscriptionState(object):
log.info("Updated partition assignment: %s", assignments)
def unsubscribe(self):
+ """Clear all topic subscriptions and partition assignments"""
self.subscription = None
self._user_assignment.clear()
self.assignment.clear()
@@ -191,17 +236,32 @@ class SubscriptionState(object):
that would require rebalancing (the leader fetches metadata for all
topics in the group so that it can do partition assignment).
- @return set of topics
+ Returns:
+ set: topics
"""
return self._group_subscription
def seek(self, partition, offset):
+ """Manually specify the fetch offset for a TopicPartition.
+
+ Overrides the fetch offsets that the consumer will use on the next
+ poll(). If this API is invoked for the same partition more than once,
+ the latest offset will be used on the next poll(). Note that you may
+ lose data if this API is arbitrarily used in the middle of consumption,
+ to reset the fetch offsets.
+
+ Arguments:
+ partition (TopicPartition): partition for seek operation
+ offset (int): message offset in partition
+ """
self.assignment[partition].seek(offset)
def assigned_partitions(self):
+ """Return set of TopicPartitions in current assignment."""
return set(self.assignment.keys())
def fetchable_partitions(self):
+ """Return set of TopicPartitions that should be Fetched."""
fetchable = set()
for partition, state in six.iteritems(self.assignment):
if state.is_fetchable():
@@ -209,6 +269,7 @@ class SubscriptionState(object):
return fetchable
def partitions_auto_assigned(self):
+ """Return True unless user supplied partitions manually."""
return self.subscription is not None
def all_consumed_offsets(self):
@@ -220,11 +281,18 @@ class SubscriptionState(object):
return all_consumed
def need_offset_reset(self, partition, offset_reset_strategy=None):
+ """Mark partition for offset reset using specified or default strategy.
+
+ Arguments:
+ partition (TopicPartition): partition to mark
+ offset_reset_strategy (OffsetResetStrategy, optional)
+ """
if offset_reset_strategy is None:
offset_reset_strategy = self._default_offset_reset_strategy
self.assignment[partition].await_reset(offset_reset_strategy)
def has_default_offset_reset_policy(self):
+ """Return True if default offset reset policy is Earliest or Latest"""
return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
def is_offset_reset_needed(self, partition):
@@ -372,8 +440,9 @@ class ConsumerRebalanceListener(object):
NOTE: This method is only called before rebalances. It is not called
prior to KafkaConsumer.close()
- @param partitions The list of partitions that were assigned to the
- consumer on the last rebalance
+ Arguments:
+ revoked (list of TopicPartition): the partitions that were assigned
+ to the consumer on the last rebalance
"""
pass
@@ -389,8 +458,8 @@ class ConsumerRebalanceListener(object):
their on_partitions_revoked() callback before any instance executes its
on_partitions_assigned() callback.
- @param partitions The list of partitions that are now assigned to the
- consumer (may include partitions previously assigned
- to the consumer)
+ Arguments:
+ assigned (list of TopicPartition): the partitions assigned to the
+ consumer (may include partitions that were previously assigned)
"""
pass