diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e8c4ce8..5f3eb1d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -424,6 +424,12 @@ class Fetcher(six.Iterator): elif fetch_offset == position: log.log(0, "Returning fetched records at offset %d for assigned" " partition %s", position, tp) + + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + for msg in self._unpack_message_set(tp, messages): # Because we are in a generator, it is possible for @@ -436,8 +442,16 @@ class Fetcher(six.Iterator): " since it is no longer fetchable", tp) break + # If there is a seek during message iteration, + # we should stop unpacking this message set and + # wait for a new fetch response that aligns with the + # new seek position + elif self._subscriptions.assignment[tp].drop_pending_message_set: + log.debug("Skipping remainder of message set for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_message_set = False + break + # Compressed messagesets may include earlier messages - # It is also possible that the user called seek() elif msg.offset < self._subscriptions.assignment[tp].position: log.debug("Skipping message offset: %s (expecting %s)", msg.offset, |