summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py8
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 0e822c4..e2bc892 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -336,6 +336,9 @@ class Fetcher(six.Iterator):
self._subscriptions.assignment[tp].position = next_offset
for record in self._unpack_message_set(tp, messages):
+ # Fetched compressed messages may include additional records
+ if record.offset < fetch_offset:
+ continue
drained[tp].append(record)
else:
# these records aren't next in line based on the last consumed
@@ -404,6 +407,11 @@ class Fetcher(six.Iterator):
" since it is no longer assigned", tp)
break
+ # Compressed messagesets may include earlier messages
+ # It is also possible that the user called seek()
+ elif msg.offset != self._subscriptions.assignment[tp].position:
+ continue
+
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else: