diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-10-23 11:59:40 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-10-23 11:59:40 -0700 |
commit | 45ab884a5c398fe394e1df1e5c587392e0f3faba (patch) | |
tree | f0dd38495b63bd01f0e40045fa60b1bbb9181288 /kafka/consumer/fetcher.py | |
parent | 9450a6bfff8517371162a968f4345ffc09380bb8 (diff) | |
download | kafka-python-consumer_iterator.tar.gz |
Revert consumer iterators from max_poll_recordsconsumer_iterator
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 99 |
1 files changed, 92 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5109523..d09f9da 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -94,6 +94,7 @@ class Fetcher(six.Iterator): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} + self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -375,6 +376,90 @@ class Fetcher(six.Iterator): part.discard() return 0 + def _message_generator(self): + """Iterate over fetched_records""" + if self._subscriptions.needs_partition_assignment: + raise StopIteration('Subscription needs partition assignment') + + while self._records: + + # Check on each iteration since this is a generator + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) <= self.config['iterator_refetch_records']: + self.send_fetches() + + part = self._records.popleft() + + tp = part.topic_partition + fetch_offset = part.fetch_offset + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the position should always be available + # as long as the partition is still assigned + position = self._subscriptions.assignment[tp].position + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition is paused before + # fetched records are returned + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + elif fetch_offset == position: + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s", position, tp) + + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + + for msg in part.messages: + + # Because we are in a generator, it is possible for + # subscription state to change between yield calls + # so we need to re-check on each loop + # this should catch assignment changes, pauses + # and resets via seek_to_beginning / seek_to_end + if not self._subscriptions.is_fetchable(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer fetchable", tp) + break + + # If there is a seek during message iteration, + # we should stop unpacking this message set and + # wait for a new fetch response that aligns with the + # new seek position + elif self._subscriptions.assignment[tp].drop_pending_message_set: + log.debug("Skipping remainder of message set for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_message_set = False + break + + # Compressed messagesets may include earlier messages + elif msg.offset < self._subscriptions.assignment[tp].position: + log.debug("Skipping message offset: %s (expecting %s)", + msg.offset, + self._subscriptions.assignment[tp].position) + continue + + self._subscriptions.assignment[tp].position = msg.offset + 1 + yield msg + + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.debug("Ignoring fetched records for %s at offset %s since" + " the current position is %d", tp, part.fetch_offset, + position) + def _unpack_message_set(self, tp, messages): try: for offset, size, msg in messages: @@ -448,13 +533,13 @@ class Fetcher(six.Iterator): return self def __next__(self): - ret, _ = self.fetched_records(max_records=1) - if not ret: - raise StopIteration - assert len(ret) == 1 - (messages,) = ret.values() - assert len(messages) == 1 - return messages[0] + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise def _deserialize(self, msg): if self.config['key_deserializer']: |