diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 99 | ||||
-rw-r--r-- | kafka/consumer/group.py | 95 |
2 files changed, 180 insertions, 14 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 5109523..d09f9da 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -94,6 +94,7 @@ class Fetcher(six.Iterator): self._unauthorized_topics = set() self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} + self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -375,6 +376,90 @@ class Fetcher(six.Iterator): part.discard() return 0 + def _message_generator(self): + """Iterate over fetched_records""" + if self._subscriptions.needs_partition_assignment: + raise StopIteration('Subscription needs partition assignment') + + while self._records: + + # Check on each iteration since this is a generator + self._raise_if_offset_out_of_range() + self._raise_if_unauthorized_topics() + self._raise_if_record_too_large() + + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) <= self.config['iterator_refetch_records']: + self.send_fetches() + + part = self._records.popleft() + + tp = part.topic_partition + fetch_offset = part.fetch_offset + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + continue + + # note that the position should always be available + # as long as the partition is still assigned + position = self._subscriptions.assignment[tp].position + if not self._subscriptions.is_fetchable(tp): + # this can happen when a partition is paused before + # fetched records are returned + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) + + elif fetch_offset == position: + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s", position, tp) + + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + + for msg in part.messages: + + # Because we are in a generator, it is possible for + # subscription state to change between yield calls + # so we need to re-check on each loop + # this should catch assignment changes, pauses + # and resets via seek_to_beginning / seek_to_end + if not self._subscriptions.is_fetchable(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer fetchable", tp) + break + + # If there is a seek during message iteration, + # we should stop unpacking this message set and + # wait for a new fetch response that aligns with the + # new seek position + elif self._subscriptions.assignment[tp].drop_pending_message_set: + log.debug("Skipping remainder of message set for partition %s", tp) + self._subscriptions.assignment[tp].drop_pending_message_set = False + break + + # Compressed messagesets may include earlier messages + elif msg.offset < self._subscriptions.assignment[tp].position: + log.debug("Skipping message offset: %s (expecting %s)", + msg.offset, + self._subscriptions.assignment[tp].position) + continue + + self._subscriptions.assignment[tp].position = msg.offset + 1 + yield msg + + else: + # these records aren't next in line based on the last consumed + # position, ignore them they must be from an obsolete request + log.debug("Ignoring fetched records for %s at offset %s since" + " the current position is %d", tp, part.fetch_offset, + position) + def _unpack_message_set(self, tp, messages): try: for offset, size, msg in messages: @@ -448,13 +533,13 @@ class Fetcher(six.Iterator): return self def __next__(self): - ret, _ = self.fetched_records(max_records=1) - if not ret: - raise StopIteration - assert len(ret) == 1 - (messages,) = ret.values() - assert len(messages) == 1 - return messages[0] + if not self._iterator: + self._iterator = self._message_generator() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise def _deserialize(self, msg): if self.config['key_deserializer']: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index efadde1..3ab68a7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -298,6 +298,8 @@ class KafkaConsumer(six.Iterator): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False + self._iterator = None + self._consumer_timeout = float('inf') if topics: self._subscription.subscribe(topics=topics) @@ -835,17 +837,96 @@ class KafkaConsumer(six.Iterator): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) + def _message_generator(self): + assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' + while time.time() < self._consumer_timeout: + + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() + + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + # fetch offsets for any subscribed partitions that we arent tracking yet + if not self._subscription.has_all_fetch_positions(): + partitions = self._subscription.missing_fetch_positions() + self._update_fetch_positions(partitions) + + poll_ms = 1000 * (self._consumer_timeout - time.time()) + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + self._client.poll(timeout_ms=poll_ms, sleep=True) + + # We need to make sure we at least keep up with scheduled tasks, + # like heartbeats, auto-commits, and metadata refreshes + timeout_at = self._next_timeout() + + # Because the consumer client poll does not sleep unless blocking on + # network IO, we need to explicitly sleep when we know we are idle + # because we haven't been assigned any partitions to fetch / consume + if self._use_consumer_group() and not self.assignment(): + sleep_time = max(timeout_at - time.time(), 0) + if sleep_time > 0 and not self._client.in_flight_request_count(): + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + # Short-circuit the fetch iterator if we are already timed out + # to avoid any unintentional interaction with fetcher setup + if time.time() > timeout_at: + continue + + for msg in self._fetcher: + yield msg + if time.time() > timeout_at: + log.debug("internal iterator timeout - breaking for poll") + break + + # an else block on a for loop only executes if there was no break + # so this should only be called on a StopIteration from the fetcher + # and we assume that it is safe to init_fetches when fetcher is done + # i.e., there are no more records stored internally + else: + self._fetcher.send_fetches() + + def _next_timeout(self): + timeout = min(self._consumer_timeout, + self._client._delayed_tasks.next_at() + time.time(), + self._client.cluster.ttl() / 1000.0 + time.time()) + + # Although the delayed_tasks timeout above should cover processing + # HeartbeatRequests, it is still possible that HeartbeatResponses + # are left unprocessed during a long _fetcher iteration without + # an intermediate poll(). And because tasks are responsible for + # rescheduling themselves, an unprocessed response will prevent + # the next heartbeat from being sent. This check should help + # avoid that. + if self._use_consumer_group(): + heartbeat = time.time() + self._coordinator.heartbeat.ttl() + timeout = min(timeout, heartbeat) + return timeout + def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1) - if not ret: - raise StopIteration - assert len(ret) == 1 - (messages,) = ret.values() - assert len(messages) == 1 - return messages[0] + if not self._iterator: + self._iterator = self._message_generator() + + self._set_consumer_timeout() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + + def _set_consumer_timeout(self): + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) # old KafkaConsumer methods are deprecated def configure(self, **configs): |