summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py99
1 files changed, 92 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 5109523..d09f9da 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -94,6 +94,7 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -375,6 +376,90 @@ class Fetcher(six.Iterator):
part.discard()
return 0
+ def _message_generator(self):
+ """Iterate over fetched_records"""
+ if self._subscriptions.needs_partition_assignment:
+ raise StopIteration('Subscription needs partition assignment')
+
+ while self._records:
+
+ # Check on each iteration since this is a generator
+ self._raise_if_offset_out_of_range()
+ self._raise_if_unauthorized_topics()
+ self._raise_if_record_too_large()
+
+ # Send additional FetchRequests when the internal queue is low
+ # this should enable moderate pipelining
+ if len(self._records) <= self.config['iterator_refetch_records']:
+ self.send_fetches()
+
+ part = self._records.popleft()
+
+ tp = part.topic_partition
+ fetch_offset = part.fetch_offset
+ if not self._subscriptions.is_assigned(tp):
+ # this can happen when a rebalance happened before
+ # fetched records are returned
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ continue
+
+ # note that the position should always be available
+ # as long as the partition is still assigned
+ position = self._subscriptions.assignment[tp].position
+ if not self._subscriptions.is_fetchable(tp):
+ # this can happen when a partition is paused before
+ # fetched records are returned
+ log.debug("Not returning fetched records for assigned partition"
+ " %s since it is no longer fetchable", tp)
+
+ elif fetch_offset == position:
+ log.log(0, "Returning fetched records at offset %d for assigned"
+ " partition %s", position, tp)
+
+ # We can ignore any prior signal to drop pending message sets
+ # because we are starting from a fresh one where fetch_offset == position
+ # i.e., the user seek()'d to this position
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+
+ for msg in part.messages:
+
+ # Because we are in a generator, it is possible for
+ # subscription state to change between yield calls
+ # so we need to re-check on each loop
+ # this should catch assignment changes, pauses
+ # and resets via seek_to_beginning / seek_to_end
+ if not self._subscriptions.is_fetchable(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer fetchable", tp)
+ break
+
+ # If there is a seek during message iteration,
+ # we should stop unpacking this message set and
+ # wait for a new fetch response that aligns with the
+ # new seek position
+ elif self._subscriptions.assignment[tp].drop_pending_message_set:
+ log.debug("Skipping remainder of message set for partition %s", tp)
+ self._subscriptions.assignment[tp].drop_pending_message_set = False
+ break
+
+ # Compressed messagesets may include earlier messages
+ elif msg.offset < self._subscriptions.assignment[tp].position:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset,
+ self._subscriptions.assignment[tp].position)
+ continue
+
+ self._subscriptions.assignment[tp].position = msg.offset + 1
+ yield msg
+
+ else:
+ # these records aren't next in line based on the last consumed
+ # position, ignore them they must be from an obsolete request
+ log.debug("Ignoring fetched records for %s at offset %s since"
+ " the current position is %d", tp, part.fetch_offset,
+ position)
+
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
@@ -448,13 +533,13 @@ class Fetcher(six.Iterator):
return self
def __next__(self):
- ret, _ = self.fetched_records(max_records=1)
- if not ret:
- raise StopIteration
- assert len(ret) == 1
- (messages,) = ret.values()
- assert len(messages) == 1
- return messages[0]
+ if not self._iterator:
+ self._iterator = self._message_generator()
+ try:
+ return next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ raise
def _deserialize(self, msg):
if self.config['key_deserializer']: