diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 86 | ||||
-rw-r--r-- | kafka/consumer/group.py | 4 | ||||
-rw-r--r-- | kafka/consumer/subscription_state.py | 34 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 3 |
4 files changed, 53 insertions, 74 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 1593018..dfbb0d6 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -209,11 +209,11 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s since it is no" " longer fetchable", partition) continue - consumed = self._subscriptions.assignment[partition].consumed - # ignore partition if its consumed offset != offset in FetchResponse + position = self._subscriptions.assignment[partition].position + # ignore partition if the current position != offset in FetchResponse # e.g. after seek() - if consumed is not None and offset == consumed: - current_out_of_range_partitions[partition] = offset + if position is not None and offset == position: + current_out_of_range_partitions[partition] = position self._offset_out_of_range_partitions.clear() if current_out_of_range_partitions: @@ -290,31 +290,30 @@ class Fetcher(six.Iterator): " since it is no longer assigned", tp) continue - # note that the consumed position should always be available + # note that the position should always be available # as long as the partition is still assigned - consumed = self._subscriptions.assignment[tp].consumed + position = self._subscriptions.assignment[tp].position if not self._subscriptions.is_fetchable(tp): - # this can happen when a partition consumption paused before + # this can happen when a partition is paused before # fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - # we also need to reset the fetch positions to pretend we did - # not fetch this partition in the previous request at all - self._subscriptions.assignment[tp].fetched = consumed - elif fetch_offset == consumed: + elif fetch_offset == position: next_offset = messages[-1][0] + 1 - log.debug("Returning fetched records for assigned partition %s" - " and update consumed position to %s", tp, next_offset) - self._subscriptions.assignment[tp].consumed = next_offset + log.debug("Returning fetched records at offset %d for assigned" + " partition %s and update position to %s", position, + tp, next_offset) + self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): drained[tp].append(record) else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request - log.debug("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + log.debug("Ignoring fetched records for %s at offset %s since" + " the current position is %d", tp, fetch_offset, + position) return dict(drained) def _unpack_message_set(self, tp, messages): @@ -351,20 +350,16 @@ class Fetcher(six.Iterator): # note that the consumed position should always be available # as long as the partition is still assigned - consumed = self._subscriptions.assignment[tp].consumed + position = self._subscriptions.assignment[tp].position if not self._subscriptions.is_fetchable(tp): # this can happen when a partition consumption paused before # fetched records are returned log.warning("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - # we also need to reset the fetch positions to pretend we did - # not fetch this partition in the previous request at all - self._subscriptions.assignment[tp].fetched = consumed - - elif fetch_offset == consumed: + elif fetch_offset == position: for msg in self._unpack_message_set(tp, messages): - self._subscriptions.assignment[tp].consumed = msg.offset + 1 + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: # these records aren't next in line based on the last consumed @@ -494,19 +489,15 @@ class Fetcher(six.Iterator): # if there is a leader and no in-flight requests, # issue a new fetch but only fetch data for partitions whose # previously fetched data has been consumed - fetched = self._subscriptions.assignment[partition].fetched - consumed = self._subscriptions.assignment[partition].consumed - if consumed == fetched: - partition_info = ( - partition.partition, - fetched, - self.config['max_partition_fetch_bytes'] - ) - fetchable[node_id][partition.topic].append(partition_info) - else: - log.debug("Skipping FetchRequest to %s because previously" - " fetched offsets (%s) have not been fully" - " consumed yet (%s)", node_id, fetched, consumed) + position = self._subscriptions.assignment[partition].position + partition_info = ( + partition.partition, + position, + self.config['max_partition_fetch_bytes'] + ) + fetchable[node_id][partition.topic].append(partition_info) + log.debug("Adding fetch request for partition %d at offset %d", + partition, position) requests = {} for node_id, partition_data in six.iteritems(fetchable): @@ -541,15 +532,12 @@ class Fetcher(six.Iterator): # we are interested in this fetch only if the beginning # offset matches the current consumed position - consumed = self._subscriptions.assignment[tp].consumed - if consumed is None: - continue - elif consumed != fetch_offset: - # the fetched position has gotten out of sync with the - # consumed position (which might happen when a - # rebalance occurs with a fetch in-flight), so we need - # to reset the fetch position so the next fetch is right - self._subscriptions.assignment[tp].fetched = consumed + position = self._subscriptions.assignment[tp].position + if position is None or position != fetch_offset: + log.debug("Discarding fetch response for partition %s" + " since its offset %d does not match the" + " expected offset %d", tp, fetch_offset, + position) continue partial = None @@ -557,9 +545,11 @@ class Fetcher(six.Iterator): partial = messages.pop() if messages: - last_offset, _, _ = messages[-1] - self._subscriptions.assignment[tp].fetched = last_offset + 1 + log.debug("Adding fetched record for partition %s with" + " offset %d to buffered record list", tp, + position) self._records.append((fetch_offset, tp, messages)) + #last_offset, _, _ = messages[-1] #self.sensors.records_fetch_lag.record(highwater - last_offset) elif partial: # we did not read a single message from a non-empty @@ -581,7 +571,7 @@ class Fetcher(six.Iterator): else: self._offset_out_of_range_partitions[tp] = fetch_offset log.info("Fetch offset %s is out of range, resetting offset", - self._subscriptions.assignment[tp].fetched) + fetch_offset) elif error_type is Errors.TopicAuthorizationFailedError: log.warn("Not authorized to read from topic %s.", tp.topic) self._unauthorized_topics.add(tp.topic) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9ce1438..4930ba1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -452,10 +452,10 @@ class KafkaConsumer(six.Iterator): """ assert self._subscription.is_assigned(partition) - offset = self._subscription.assignment[partition].consumed + offset = self._subscription.assignment[partition].position if offset is None: self._update_fetch_positions(partition) - offset = self._subscription.assignment[partition].consumed + offset = self._subscription.assignment[partition].position return offset def pause(self, *partitions): diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index c60f192..9b52ffb 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -276,7 +276,7 @@ class SubscriptionState(object): all_consumed = {} for partition, state in six.iteritems(self.assignment): if state.has_valid_position: - all_consumed[partition] = OffsetAndMetadata(state.consumed, '') + all_consumed[partition] = OffsetAndMetadata(state.position, '') return all_consumed def need_offset_reset(self, partition, offset_reset_strategy=None): @@ -332,41 +332,29 @@ class SubscriptionState(object): class TopicPartitionState(object): def __init__(self): self.committed = None # last committed position - self.has_valid_position = False # whether we have valid consumed and fetched positions + self.has_valid_position = False # whether we have valid position self.paused = False # whether this partition has been paused by the user self.awaiting_reset = False # whether we are awaiting reset self.reset_strategy = None # the reset strategy if awaitingReset is set - self._consumed = None # offset exposed to the user - self._fetched = None # current fetch position + self._position = None # offset exposed to the user - def _set_fetched(self, offset): - assert self.has_valid_position, 'Valid consumed/fetch position required' - self._fetched = offset + def _set_position(self, offset): + assert self.has_valid_position, 'Valid position required' + self._position = offset - def _get_fetched(self): - return self._fetched + def _get_position(self): + return self._position - fetched = property(_get_fetched, _set_fetched, None, "current fetch position") - - def _set_consumed(self, offset): - assert self.has_valid_position, 'Valid consumed/fetch position required' - self._consumed = offset - - def _get_consumed(self): - return self._consumed - - consumed = property(_get_consumed, _set_consumed, None, "last consumed position") + position = property(_get_position, _set_position, None, "last position") def await_reset(self, strategy): self.awaiting_reset = True self.reset_strategy = strategy - self._consumed = None - self._fetched = None + self._position = None self.has_valid_position = False def seek(self, offset): - self._consumed = offset - self._fetched = offset + self._position = offset self.awaiting_reset = False self.reset_strategy = None self.has_valid_position = True diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 48d5e14..d728624 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -167,7 +167,8 @@ class ConsumerCoordinator(BaseCoordinator): old_partitions_per_topic = self._partitions_per_topic self._partitions_per_topic = {} for topic in self._subscription.group_subscription(): - self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic)) + partitions = self._cluster.partitions_for_topic(topic) or [] + self._partitions_per_topic[topic] = set(partitions) if self._partitions_per_topic != old_partitions_per_topic: return True |