diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 86 |
1 files changed, 38 insertions, 48 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1593018..dfbb0d6 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -209,11 +209,11 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s since it is no" " longer fetchable", partition) continue - consumed = self._subscriptions.assignment[partition].consumed - # ignore partition if its consumed offset != offset in FetchResponse + position = self._subscriptions.assignment[partition].position + # ignore partition if the current position != offset in FetchResponse # e.g. after seek() - if consumed is not None and offset == consumed: - current_out_of_range_partitions[partition] = offset + if position is not None and offset == position: + current_out_of_range_partitions[partition] = position self._offset_out_of_range_partitions.clear() if current_out_of_range_partitions: @@ -290,31 +290,30 @@ class Fetcher(six.Iterator): " since it is no longer assigned", tp) continue - # note that the consumed position should always be available + # note that the position should always be available # as long as the partition is still assigned - consumed = self._subscriptions.assignment[tp].consumed + position = self._subscriptions.assignment[tp].position if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition consumption paused before + # this can happen when a partition is paused before # fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - # we also need to reset the fetch positions to pretend we did - # not fetch this partition in the previous request at all - self._subscriptions.assignment[tp].fetched = consumed - elif fetch_offset == consumed: + elif fetch_offset == position: next_offset = messages[-1][0] + 1 - log.debug("Returning fetched records for assigned partition %s" - " and update consumed position to %s", tp, next_offset) - self._subscriptions.assignment[tp].consumed = next_offset + log.debug("Returning fetched records at offset %d for assigned" + " partition %s and update position to %s", position, + tp, next_offset) + self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): drained[tp].append(record) else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request - log.debug("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + log.debug("Ignoring fetched records for %s at offset %s since" + " the current position is %d", tp, fetch_offset, + position) return dict(drained) def _unpack_message_set(self, tp, messages): @@ -351,20 +350,16 @@ class Fetcher(six.Iterator): # note that the consumed position should always be available # as long as the partition is still assigned - consumed = self._subscriptions.assignment[tp].consumed + position = self._subscriptions.assignment[tp].position if not self._subscriptions.is_fetchable(tp): # this can happen when a partition consumption paused before # fetched records are returned log.warning("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - # we also need to reset the fetch positions to pretend we did - # not fetch this partition in the previous request at all - self._subscriptions.assignment[tp].fetched = consumed - - elif fetch_offset == consumed: + elif fetch_offset == position: for msg in self._unpack_message_set(tp, messages): - self._subscriptions.assignment[tp].consumed = msg.offset + 1 + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: # these records aren't next in line based on the last consumed @@ -494,19 +489,15 @@ class Fetcher(six.Iterator): # if there is a leader and no in-flight requests, # issue a new fetch but only fetch data for partitions whose # previously fetched data has been consumed - fetched = self._subscriptions.assignment[partition].fetched - consumed = self._subscriptions.assignment[partition].consumed - if consumed == fetched: - partition_info = ( - partition.partition, - fetched, - self.config['max_partition_fetch_bytes'] - ) - fetchable[node_id][partition.topic].append(partition_info) - else: - log.debug("Skipping FetchRequest to %s because previously" - " fetched offsets (%s) have not been fully" - " consumed yet (%s)", node_id, fetched, consumed) + position = self._subscriptions.assignment[partition].position + partition_info = ( + partition.partition, + position, + self.config['max_partition_fetch_bytes'] + ) + fetchable[node_id][partition.topic].append(partition_info) + log.debug("Adding fetch request for partition %d at offset %d", + partition, position) requests = {} for node_id, partition_data in six.iteritems(fetchable): @@ -541,15 +532,12 @@ class Fetcher(six.Iterator): # we are interested in this fetch only if the beginning # offset matches the current consumed position - consumed = self._subscriptions.assignment[tp].consumed - if consumed is None: - continue - elif consumed != fetch_offset: - # the fetched position has gotten out of sync with the - # consumed position (which might happen when a - # rebalance occurs with a fetch in-flight), so we need - # to reset the fetch position so the next fetch is right - self._subscriptions.assignment[tp].fetched = consumed + position = self._subscriptions.assignment[tp].position + if position is None or position != fetch_offset: + log.debug("Discarding fetch response for partition %s" + " since its offset %d does not match the" + " expected offset %d", tp, fetch_offset, + position) continue partial = None @@ -557,9 +545,11 @@ class Fetcher(six.Iterator): partial = messages.pop() if messages: - last_offset, _, _ = messages[-1] - self._subscriptions.assignment[tp].fetched = last_offset + 1 + log.debug("Adding fetched record for partition %s with" + " offset %d to buffered record list", tp, + position) self._records.append((fetch_offset, tp, messages)) + #last_offset, _, _ = messages[-1] #self.sensors.records_fetch_lag.record(highwater - last_offset) elif partial: # we did not read a single message from a non-empty @@ -581,7 +571,7 @@ class Fetcher(six.Iterator): else: self._offset_out_of_range_partitions[tp] = fetch_offset log.info("Fetch offset %s is out of range, resetting offset", - self._subscriptions.assignment[tp].fetched) + fetch_offset) elif error_type is Errors.TopicAuthorizationFailedError: log.warn("Not authorized to read from topic %s.", tp.topic) self._unauthorized_topics.add(tp.topic) |