diff options
| -rw-r--r-- | kafka/consumer/fetcher.py | 16 | ||||
| -rw-r--r-- | kafka/consumer/subscription_state.py | 2 | 
2 files changed, 17 insertions, 1 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e8c4ce8..5f3eb1d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -424,6 +424,12 @@ class Fetcher(six.Iterator):              elif fetch_offset == position:                  log.log(0, "Returning fetched records at offset %d for assigned"                             " partition %s", position, tp) + +                # We can ignore any prior signal to drop pending message sets +                # because we are starting from a fresh one where fetch_offset == position +                # i.e., the user seek()'d to this position +                self._subscriptions.assignment[tp].drop_pending_message_set = False +                  for msg in self._unpack_message_set(tp, messages):                      # Because we are in a generator, it is possible for @@ -436,8 +442,16 @@ class Fetcher(six.Iterator):                                    " since it is no longer fetchable", tp)                          break +                    # If there is a seek during message iteration, +                    # we should stop unpacking this message set and +                    # wait for a new fetch response that aligns with the +                    # new seek position +                    elif self._subscriptions.assignment[tp].drop_pending_message_set: +                        log.debug("Skipping remainder of message set for partition %s", tp) +                        self._subscriptions.assignment[tp].drop_pending_message_set = False +                        break +                      # Compressed messagesets may include earlier messages -                    # It is also possible that the user called seek()                      elif msg.offset < self._subscriptions.assignment[tp].position:                          log.debug("Skipping message offset: %s (expecting %s)",                                    msg.offset, diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 1c045aa..fa09a06 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -350,6 +350,7 @@ class TopicPartitionState(object):          self.reset_strategy = None # the reset strategy if awaitingReset is set          self._position = None # offset exposed to the user          self.highwater = None +        self.drop_pending_message_set = False      def _set_position(self, offset):          assert self.has_valid_position, 'Valid position required' @@ -371,6 +372,7 @@ class TopicPartitionState(object):          self.awaiting_reset = False          self.reset_strategy = None          self.has_valid_position = True +        self.drop_pending_message_set = True      def pause(self):          self.paused = True  | 
