summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-21 09:14:57 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 14:57:54 -0800
commitb643ba1a1ed7838625012e6f75a1ed9b35ffd022 (patch)
treedebf4f38ec0a5ff222f22c6f7c8d437ff29bfac6
parent87f0181b2c6c51318a6f950e9478d14a7efe963f (diff)
downloadkafka-python-b643ba1a1ed7838625012e6f75a1ed9b35ffd022.tar.gz
New class SubscriptionState, based on upstream Java client
-rw-r--r--kafka/consumer/subscription_state.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
new file mode 100644
index 0000000..6ebd925
--- /dev/null
+++ b/kafka/consumer/subscription_state.py
@@ -0,0 +1,304 @@
+from __future__ import absolute_import
+
+import logging
+import re
+
+import six
+
+from kafka.common import IllegalStateError, OffsetAndMetadata
+from kafka.protocol.offset import OffsetResetStrategy
+
+log = logging.getLogger(__name__)
+
+
+class SubscriptionState(object):
+ """
+ A class for tracking the topics, partitions, and offsets for the consumer.
+ A partition is "assigned" either directly with assign_from_user() (manual
+ assignment) or with assign_from_subscribed() (automatic assignment from
+ subscription).
+
+ Once assigned, the partition is not considered "fetchable" until its initial
+ position has been set with seek(). Fetchable partitions track a fetch
+ position which is used to set the offset of the next fetch, and a consumed
+ position which is the last offset that has been returned to the user. You
+ can suspend fetching from a partition through pause() without affecting the
+ fetched/consumed offsets. The partition will remain unfetchable until the
+ resume() is used. You can also query the pause state independently with
+ is_paused().
+
+ Note that pause state as well as fetch/consumed positions are not preserved
+ when partition assignment is changed whether directly by the user or
+ through a group rebalance.
+
+ This class also maintains a cache of the latest commit position for each of
+ the assigned partitions. This is updated through committed() and can be used
+ to set the initial fetch position (e.g. Fetcher._reset_offset() ).
+ """
+ _SUBSCRIPTION_EXCEPTION_MESSAGE = ("Subscription to topics, partitions and"
+ " pattern are mutually exclusive")
+
+ def __init__(self, offset_reset_strategy='earliest'):
+ """Initialize a SubscriptionState instance
+
+ offset_reset_strategy: 'earliest' or 'latest', otherwise
+ exception will be raised when fetching an offset
+ that is no longer available.
+ Defaults to earliest.
+ """
+ try:
+ offset_reset_strategy = getattr(OffsetResetStrategy,
+ offset_reset_strategy.upper())
+ except AttributeError:
+ log.warning('Unrecognized offset_reset_strategy, using NONE')
+ offset_reset_strategy = OffsetResetStrategy.NONE
+ self._default_offset_reset_strategy = offset_reset_strategy
+
+ self.subscription = None # set() or None
+ self.subscribed_pattern = None # regex str or None
+ self._group_subscription = set()
+ self._user_assignment = set()
+ self.assignment = dict()
+ self.needs_partition_assignment = False
+ self.listener = None
+
+ # initialize to true for the consumers to fetch offset upon starting up
+ self.needs_fetch_committed_offsets = True
+
+ def subscribe(self, topics=(), pattern=None, listener=None):
+ """Subscribe to a list of topics, or a topic regex pattern
+
+ Partitions will be assigned via a group coordinator
+ (incompatible with assign_from_user)
+
+ Optionally include listener callback, which must be a
+ ConsumerRebalanceListener and will be called before and
+ after each rebalance operation.
+ """
+ if self._user_assignment or (topics and pattern):
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ if not (topics or pattern):
+ raise IllegalStateError('Must provide topics or a pattern')
+
+ if pattern:
+ log.info('Subscribing to pattern: /%s/', pattern)
+ self.subscription = set()
+ self.subscribed_pattern = re.compile(pattern)
+ else:
+ self.change_subscription(topics)
+ self.listener = listener
+
+ def change_subscription(self, topics):
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ if self.subscription == set(topics):
+ log.warning("subscription unchanged by change_subscription(%s)",
+ topics)
+ return
+
+ log.info('Updating subscribed topics to: %s', topics)
+ self.subscription = set(topics)
+ self._group_subscription.update(topics)
+ self.needs_partition_assignment = True
+
+ # Remove any assigned partitions which are no longer subscribed to
+ for tp in set(self.assignment.keys()):
+ if tp.topic not in self.subscription:
+ del self.assignment[tp]
+
+ def group_subscribe(self, topics):
+ """Add topics to the current group subscription.
+
+ This is used by the group leader to ensure that it receives metadata
+ updates for all topics that any member of the group is subscribed to.
+
+ @param topics list of topics to add to the group subscription
+ """
+ if self._user_assignment:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+ self._group_subscription.update(topics)
+
+ def mark_for_reassignment(self):
+ self._group_subscription.intersection_update(self.subscription)
+ self.needs_partition_assignment = True
+
+ def assign_from_user(self, partitions):
+ """
+ Change the assignment to the specified partitions provided by the user,
+ note this is different from assign_from_subscribed()
+ whose input partitions are provided from the subscribed topics.
+
+ @param partitions: list (or iterable) of TopicPartition()
+ """
+ if self.subscription is not None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ self._user_assignment.clear()
+ self._user_assignment.update(partitions)
+
+ for partition in partitions:
+ if partition not in self.assignment:
+ self.add_assigned_partition(partition)
+
+ for tp in set(self.assignment.keys()) - self._user_assignment:
+ del self.assignment[tp]
+
+ self.needs_partition_assignment = False
+
+ def assign_from_subscribed(self, assignments):
+ """
+ Change the assignment to the specified partitions returned from the coordinator,
+ note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+ """
+ if self.subscription is None:
+ raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
+
+ for tp in assignments:
+ if tp.topic not in self.subscription:
+ raise ValueError("Assigned partition %s for non-subscribed topic." % tp)
+ self.assignment.clear()
+ for tp in assignments:
+ self.add_assigned_partition(tp)
+ self.needs_partition_assignment = False
+
+ def unsubscribe(self):
+ self.subscription = None
+ self._user_assignment.clear()
+ self.assignment.clear()
+ self.needs_partition_assignment = True
+ self.subscribed_pattern = None
+
+ def group_subscription(self):
+ """Get the topic subscription for the group.
+
+ For the leader, this will include the union of all member subscriptions.
+ For followers, it is the member's subscription only.
+
+ This is used when querying topic metadata to detect metadata changes
+ that would require rebalancing (the leader fetches metadata for all
+ topics in the group so that it can do partition assignment).
+
+ @return set of topics
+ """
+ return self._group_subscription
+
+ def seek(self, partition, offset):
+ self.assignment[partition].seek(offset)
+
+ def assigned_partitions(self):
+ return set(self.assignment.keys())
+
+ def fetchable_partitions(self):
+ fetchable = set()
+ for partition, state in six.iteritems(self.assignment):
+ if state.is_fetchable():
+ fetchable.add(partition)
+ return fetchable
+
+ def partitions_auto_assigned(self):
+ return self.subscription is not None
+
+ def all_consumed_offsets(self):
+ """Returns consumed offsets as {TopicPartition: OffsetAndMetadata}"""
+ all_consumed = {}
+ for partition, state in six.iteritems(self.assignment):
+ if state.has_valid_position:
+ all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
+ return all_consumed
+
+ def need_offset_reset(self, partition, offset_reset_strategy=None):
+ if offset_reset_strategy is None:
+ offset_reset_strategy = self._default_offset_reset_strategy
+ self.assignment[partition].await_reset(offset_reset_strategy)
+
+ def has_default_offset_reset_policy(self):
+ return self._default_offset_reset_strategy != OffsetResetStrategy.NONE
+
+ def is_offset_reset_needed(self, partition):
+ return self.assignment[partition].awaiting_reset
+
+ def has_all_fetch_positions(self):
+ for state in self.assignment.values():
+ if not state.has_valid_position:
+ return False
+ return True
+
+ def missing_fetch_positions(self):
+ missing = set()
+ for partition, state in six.iteritems(self.assignment):
+ if not state.has_valid_position:
+ missing.add(partition)
+ return missing
+
+ def is_assigned(self, partition):
+ return partition in self.assignment
+
+ def is_paused(self, partition):
+ return partition in self.assignment and self.assignment[partition].paused
+
+ def is_fetchable(self, partition):
+ return partition in self.assignment and self.assignment[partition].is_fetchable()
+
+ def pause(self, partition):
+ self.assignment[partition].pause()
+
+ def resume(self, partition):
+ self.assignment[partition].resume()
+
+ def add_assigned_partition(self, partition):
+ self.assignment[partition] = TopicPartitionState()
+
+
+class TopicPartitionState(object):
+ def __init__(self):
+ self.committed = None # last committed position
+ self.has_valid_position = False # whether we have valid consumed and fetched positions
+ self.paused = False # whether this partition has been paused by the user
+ self.awaiting_reset = False # whether we are awaiting reset
+ self.reset_strategy = None # the reset strategy if awaitingReset is set
+ self._consumed = None # offset exposed to the user
+ self._fetched = None # current fetch position
+
+ def _set_fetched(self, offset):
+ if not self.has_valid_position:
+ raise IllegalStateError("Cannot update fetch position without valid consumed/fetched positions")
+ self._fetched = offset
+
+ def _get_fetched(self):
+ return self._fetched
+
+ fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
+
+ def _set_consumed(self, offset):
+ if not self.has_valid_position:
+ raise IllegalStateError("Cannot update consumed position without valid consumed/fetched positions")
+ self._consumed = offset
+
+ def _get_consumed(self):
+ return self._consumed
+
+ consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
+
+ def await_reset(self, strategy):
+ self.awaiting_reset = True
+ self.reset_strategy = strategy
+ self._consumed = None
+ self._fetched = None
+ self.has_valid_position = False
+
+ def seek(self, offset):
+ self._consumed = offset
+ self._fetched = offset
+ self.awaiting_reset = False
+ self.reset_strategy = None
+ self.has_valid_position = True
+
+ def pause(self):
+ self.paused = True
+
+ def resume(self):
+ self.paused = False
+
+ def is_fetchable(self):
+ return not self.paused and self.has_valid_position