diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c133a31..8a48575 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -316,6 +316,55 @@ class Fetcher(object): tp, fetch_offset) return dict(drained) + def __iter__(self): + """Iterate over fetched_records""" + if self._subscriptions.needs_partition_assignment: + raise StopIteration('Subscription needs partition assignment') + + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + while self._records: + (fetch_offset, tp, messages) = self._records.popleft() + + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned + log.warning("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the consumed position should always be available + # as long as the partition is still assigned + consumed = self._subscriptions.assignment[tp].consumed + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition consumption paused before + # fetched records are returned + log.warning("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + # we also need to reset the fetch positions to pretend we did + # not fetch this partition in the previous request at all + self._subscriptions.assignment[tp].fetched = consumed + + elif fetch_offset == consumed: + # TODO: handle compressed messages + for offset, size, msg in messages: + if msg.attributes: + raise Errors.KafkaError('Compressed messages not supported yet') + elif self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + + self._subscriptions.assignment[tp].consumed = offset + 1 + key, value = self._deserialize(msg) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.warning("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) + def _deserialize(self, msg): if self.config['key_deserializer']: key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable |