diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-31 23:41:08 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-31 23:41:08 -0800 |
commit | 130155b874b6d1f629e6441068f7e7ef588a8779 (patch) | |
tree | c4076c547772538950dbb0ede7ab76321049870e | |
parent | 54d21bf0f90a343509ec98391b742f596507f673 (diff) | |
download | kafka-python-130155b874b6d1f629e6441068f7e7ef588a8779.tar.gz |
Fetcher should filter compressed messages with offsets lower than were requested
-rw-r--r-- | kafka/consumer/fetcher.py | 8 |
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 0e822c4..e2bc892 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -336,6 +336,9 @@ class Fetcher(six.Iterator): self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): + # Fetched compressed messages may include additional records + if record.offset < fetch_offset: + continue drained[tp].append(record) else: # these records aren't next in line based on the last consumed @@ -404,6 +407,11 @@ class Fetcher(six.Iterator): " since it is no longer assigned", tp) break + # Compressed messagesets may include earlier messages + # It is also possible that the user called seek() + elif msg.offset != self._subscriptions.assignment[tp].position: + continue + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: |