diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 8 |
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 0e822c4..e2bc892 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -336,6 +336,9 @@ class Fetcher(six.Iterator): self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): + # Fetched compressed messages may include additional records + if record.offset < fetch_offset: + continue drained[tp].append(record) else: # these records aren't next in line based on the last consumed @@ -404,6 +407,11 @@ class Fetcher(six.Iterator): " since it is no longer assigned", tp) break + # Compressed messagesets may include earlier messages + # It is also possible that the user called seek() + elif msg.offset != self._subscriptions.assignment[tp].position: + continue + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: |