diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-28 14:54:50 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-28 14:57:54 -0800 |
commit | 9820c5d55398bdb49ffbcd0e6a997bde9f8891fe (patch) | |
tree | ada738c36033dc03de57fbfafc101c4eacb20c5b | |
parent | 35eb8c5eaadbbb81f5e553d0ab10c5221a675378 (diff) | |
download | kafka-python-9820c5d55398bdb49ffbcd0e6a997bde9f8891fe.tar.gz |
Define ConsumerRebalanceListener abstract class
-rw-r--r-- | kafka/consumer/subscription_state.py | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 6ebd925..a90d9b3 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import abc import logging import re @@ -86,6 +87,9 @@ class SubscriptionState(object): self.subscribed_pattern = re.compile(pattern) else: self.change_subscription(topics) + + if listener and not isinstance(listener, ConsumerRebalanceListener): + raise TypeError('listener must be a ConsumerRebalanceListener') self.listener = listener def change_subscription(self, topics): @@ -302,3 +306,83 @@ class TopicPartitionState(object): def is_fetchable(self): return not self.paused and self.has_valid_position + + +class ConsumerRebalanceListener(object): + """ + A callback interface that the user can implement to trigger custom actions + when the set of partitions assigned to the consumer changes. + + This is applicable when the consumer is having Kafka auto-manage group + membership. If the consumer's directly assign partitions, those + partitions will never be reassigned and this callback is not applicable. + + When Kafka is managing the group membership, a partition re-assignment will + be triggered any time the members of the group changes or the subscription + of the members changes. This can occur when processes die, new process + instances are added or old instances come back to life after failure. + Rebalances can also be triggered by changes affecting the subscribed + topics (e.g. when then number of partitions is administratively adjusted). + + There are many uses for this functionality. One common use is saving offsets + in a custom store. By saving offsets in the on_partitions_revoked(), call we + can ensure that any time partition assignment changes the offset gets saved. + + Another use is flushing out any kind of cache of intermediate results the + consumer may be keeping. For example, consider a case where the consumer is + subscribed to a topic containing user page views, and the goal is to count + the number of page views per users for each five minute window. Let's say + the topic is partitioned by the user id so that all events for a particular + user will go to a single consumer instance. The consumer can keep in memory + a running tally of actions per user and only flush these out to a remote + data store when its cache gets too big. However if a partition is reassigned + it may want to automatically trigger a flush of this cache, before the new + owner takes over consumption. + + This callback will execute in the user thread as part of the Consumer.poll() + whenever partition assignment changes. + + It is guaranteed that all consumer processes will invoke + on_partitions_revoked() prior to any process invoking + on_partitions_assigned(). So if offsets or other state is saved in the + on_partitions_revoked() call, it should be saved by the time the process + taking over that partition has their on_partitions_assigned() callback + called to load the state. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def on_partitions_revoked(self, revoked): + """ + A callback method the user can implement to provide handling of offset + commits to a customized store on the start of a rebalance operation. + This method will be called before a rebalance operation starts and + after the consumer stops fetching data. It is recommended that offsets + should be committed in this callback to either Kafka or a custom offset + store to prevent duplicate data. + + NOTE: This method is only called before rebalances. It is not called + prior to KafkaConsumer.close() + + @param partitions The list of partitions that were assigned to the + consumer on the last rebalance + """ + pass + + @abc.abstractmethod + def on_partitions_assigned(self, assigned): + """ + A callback method the user can implement to provide handling of + customized offsets on completion of a successful partition + re-assignment. This method will be called after an offset re-assignment + completes and before the consumer starts fetching data. + + It is guaranteed that all the processes in a consumer group will execute + their on_partitions_revoked() callback before any instance executes its + on_partitions_assigned() callback. + + @param partitions The list of partitions that are now assigned to the + consumer (may include partitions previously assigned + to the consumer) + """ + pass |