summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py22
1 files changed, 22 insertions, 0 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f2991b2..009c163 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -463,6 +463,28 @@ class KafkaConsumer(six.Iterator):
offset = self._subscription.assignment[partition].position
return offset
+ def highwater(self, partition):
+ """Last known highwater offset for a partition
+
+ A highwater offset is the offset that will be assigned to the next
+ message that is produced. It may be useful for calculating lag, by
+ comparing with the reported position. Note that both position and
+ highwater refer to the *next* offset -- i.e., highwater offset is
+ one greater than the newest availabel message.
+
+ Highwater offsets are returned in FetchResponse messages, so will
+ not be available if not FetchRequests have been sent for this partition
+ yet.
+
+ Arguments:
+ partition (TopicPartition): partition to check
+
+ Returns:
+ int or None: offset if available
+ """
+ assert self._subscription.is_assigned(partition), 'Partition is not assigned'
+ return self._subscription.assignment[partition].highwater
+
def pause(self, *partitions):
"""Suspend fetching from the requested partitions.