diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f780fb2..66b6df0 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -683,7 +683,9 @@ class Fetcher(six.Iterator): self._subscriptions.assignment[tp].highwater = highwater # we are interested in this fetch only if the beginning - # offset matches the current consumed position + # offset (of the *request*) matches the current consumed position + # Note that the *response* may return a messageset that starts + # earlier (e.g., compressed messages) or later (e.g., compacted topic) fetch_offset = fetch_offsets[tp] position = self._subscriptions.assignment[tp].position if position is None or position != fetch_offset: |