summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py4
-rw-r--r--kafka/consumer/group.py22
-rw-r--r--kafka/consumer/subscription_state.py1
3 files changed, 26 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 41f53aa..5cc1f9d 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -574,11 +574,13 @@ class Fetcher(six.Iterator):
# consumption paused while fetch is still in-flight
log.debug("Ignoring fetched records for partition %s"
" since it is no longer fetchable", tp)
+
elif error_type is Errors.NoError:
- fetch_offset = fetch_offsets[tp]
+ self._subscriptions.assignment[tp].highwater = highwater
# we are interested in this fetch only if the beginning
# offset matches the current consumed position
+ fetch_offset = fetch_offsets[tp]
position = self._subscriptions.assignment[tp].position
if position is None or position != fetch_offset:
log.debug("Discarding fetch response for partition %s"
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index f2991b2..009c163 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -463,6 +463,28 @@ class KafkaConsumer(six.Iterator):
offset = self._subscription.assignment[partition].position
return offset
+ def highwater(self, partition):
+ """Last known highwater offset for a partition
+
+ A highwater offset is the offset that will be assigned to the next
+ message that is produced. It may be useful for calculating lag, by
+ comparing with the reported position. Note that both position and
+ highwater refer to the *next* offset -- i.e., highwater offset is
+ one greater than the newest availabel message.
+
+ Highwater offsets are returned in FetchResponse messages, so will
+ not be available if not FetchRequests have been sent for this partition
+ yet.
+
+ Arguments:
+ partition (TopicPartition): partition to check
+
+ Returns:
+ int or None: offset if available
+ """
+ assert self._subscription.is_assigned(partition), 'Partition is not assigned'
+ return self._subscription.assignment[partition].highwater
+
def pause(self, *partitions):
"""Suspend fetching from the requested partitions.
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 0a4f0ca..c137e5b 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -340,6 +340,7 @@ class TopicPartitionState(object):
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
+ self.highwater = None
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'