summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index c9bbb97..4f2a543 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -372,11 +372,6 @@ class Fetcher(six.Iterator):
tp, next_offset)
for record in part_records:
- # Fetched compressed messages may include additional records
- if record.offset < fetch_offset:
- log.debug("Skipping message offset: %s (expecting %s)",
- record.offset, fetch_offset)
- continue
drained[tp].append(record)
self._subscriptions.assignment[tp].position = next_offset
@@ -843,10 +838,15 @@ class Fetcher(six.Iterator):
# When fetching an offset that is in the middle of a
# compressed batch, we will get all messages in the batch.
# But we want to start 'take' at the fetch_offset
+ # (or the next highest offset in case the message was compacted)
for i, msg in enumerate(messages):
- if msg.offset == fetch_offset:
+ if msg.offset < fetch_offset:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset, fetch_offset)
+ else:
self.message_idx = i
break
+
else:
self.message_idx = 0
self.messages = None
@@ -868,8 +868,9 @@ class Fetcher(six.Iterator):
next_idx = self.message_idx + n
res = self.messages[self.message_idx:next_idx]
self.message_idx = next_idx
- if len(self) > 0:
- self.fetch_offset = self.messages[self.message_idx].offset
+ # fetch_offset should be incremented by 1 to parallel the
+ # subscription position (also incremented by 1)
+ self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1)
return res