diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c9bbb97..4f2a543 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -372,11 +372,6 @@ class Fetcher(six.Iterator): tp, next_offset) for record in part_records: - # Fetched compressed messages may include additional records - if record.offset < fetch_offset: - log.debug("Skipping message offset: %s (expecting %s)", - record.offset, fetch_offset) - continue drained[tp].append(record) self._subscriptions.assignment[tp].position = next_offset @@ -843,10 +838,15 @@ class Fetcher(six.Iterator): # When fetching an offset that is in the middle of a # compressed batch, we will get all messages in the batch. # But we want to start 'take' at the fetch_offset + # (or the next highest offset in case the message was compacted) for i, msg in enumerate(messages): - if msg.offset == fetch_offset: + if msg.offset < fetch_offset: + log.debug("Skipping message offset: %s (expecting %s)", + msg.offset, fetch_offset) + else: self.message_idx = i break + else: self.message_idx = 0 self.messages = None @@ -868,8 +868,9 @@ class Fetcher(six.Iterator): next_idx = self.message_idx + n res = self.messages[self.message_idx:next_idx] self.message_idx = next_idx - if len(self) > 0: - self.fetch_offset = self.messages[self.message_idx].offset + # fetch_offset should be incremented by 1 to parallel the + # subscription position (also incremented by 1) + self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1) return res |