summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-28 14:54:50 -0800
committerDana Powers <dana.powers@rd.io>2015-12-28 14:57:54 -0800
commit9820c5d55398bdb49ffbcd0e6a997bde9f8891fe (patch)
treeada738c36033dc03de57fbfafc101c4eacb20c5b
parent35eb8c5eaadbbb81f5e553d0ab10c5221a675378 (diff)
downloadkafka-python-9820c5d55398bdb49ffbcd0e6a997bde9f8891fe.tar.gz
Define ConsumerRebalanceListener abstract class
-rw-r--r--kafka/consumer/subscription_state.py84
1 files changed, 84 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 6ebd925..a90d9b3 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -1,5 +1,6 @@
from __future__ import absolute_import
+import abc
import logging
import re
@@ -86,6 +87,9 @@ class SubscriptionState(object):
self.subscribed_pattern = re.compile(pattern)
else:
self.change_subscription(topics)
+
+ if listener and not isinstance(listener, ConsumerRebalanceListener):
+ raise TypeError('listener must be a ConsumerRebalanceListener')
self.listener = listener
def change_subscription(self, topics):
@@ -302,3 +306,83 @@ class TopicPartitionState(object):
def is_fetchable(self):
return not self.paused and self.has_valid_position
+
+
+class ConsumerRebalanceListener(object):
+ """
+ A callback interface that the user can implement to trigger custom actions
+ when the set of partitions assigned to the consumer changes.
+
+ This is applicable when the consumer is having Kafka auto-manage group
+ membership. If the consumer's directly assign partitions, those
+ partitions will never be reassigned and this callback is not applicable.
+
+ When Kafka is managing the group membership, a partition re-assignment will
+ be triggered any time the members of the group changes or the subscription
+ of the members changes. This can occur when processes die, new process
+ instances are added or old instances come back to life after failure.
+ Rebalances can also be triggered by changes affecting the subscribed
+ topics (e.g. when then number of partitions is administratively adjusted).
+
+ There are many uses for this functionality. One common use is saving offsets
+ in a custom store. By saving offsets in the on_partitions_revoked(), call we
+ can ensure that any time partition assignment changes the offset gets saved.
+
+ Another use is flushing out any kind of cache of intermediate results the
+ consumer may be keeping. For example, consider a case where the consumer is
+ subscribed to a topic containing user page views, and the goal is to count
+ the number of page views per users for each five minute window. Let's say
+ the topic is partitioned by the user id so that all events for a particular
+ user will go to a single consumer instance. The consumer can keep in memory
+ a running tally of actions per user and only flush these out to a remote
+ data store when its cache gets too big. However if a partition is reassigned
+ it may want to automatically trigger a flush of this cache, before the new
+ owner takes over consumption.
+
+ This callback will execute in the user thread as part of the Consumer.poll()
+ whenever partition assignment changes.
+
+ It is guaranteed that all consumer processes will invoke
+ on_partitions_revoked() prior to any process invoking
+ on_partitions_assigned(). So if offsets or other state is saved in the
+ on_partitions_revoked() call, it should be saved by the time the process
+ taking over that partition has their on_partitions_assigned() callback
+ called to load the state.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def on_partitions_revoked(self, revoked):
+ """
+ A callback method the user can implement to provide handling of offset
+ commits to a customized store on the start of a rebalance operation.
+ This method will be called before a rebalance operation starts and
+ after the consumer stops fetching data. It is recommended that offsets
+ should be committed in this callback to either Kafka or a custom offset
+ store to prevent duplicate data.
+
+ NOTE: This method is only called before rebalances. It is not called
+ prior to KafkaConsumer.close()
+
+ @param partitions The list of partitions that were assigned to the
+ consumer on the last rebalance
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_partitions_assigned(self, assigned):
+ """
+ A callback method the user can implement to provide handling of
+ customized offsets on completion of a successful partition
+ re-assignment. This method will be called after an offset re-assignment
+ completes and before the consumer starts fetching data.
+
+ It is guaranteed that all the processes in a consumer group will execute
+ their on_partitions_revoked() callback before any instance executes its
+ on_partitions_assigned() callback.
+
+ @param partitions The list of partitions that are now assigned to the
+ consumer (may include partitions previously assigned
+ to the consumer)
+ """
+ pass