diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 95 |
1 files changed, 88 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index efadde1..3ab68a7 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -298,6 +298,8 @@ class KafkaConsumer(six.Iterator): assignors=self.config['partition_assignment_strategy'], **self.config) self._closed = False + self._iterator = None + self._consumer_timeout = float('inf') if topics: self._subscription.subscribe(topics=topics) @@ -835,17 +837,96 @@ class KafkaConsumer(six.Iterator): # then do any offset lookups in case some positions are not known self._fetcher.update_fetch_positions(partitions) + def _message_generator(self): + assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' + while time.time() < self._consumer_timeout: + + if self._use_consumer_group(): + self._coordinator.ensure_coordinator_known() + self._coordinator.ensure_active_group() + + # 0.8.2 brokers support kafka-backed offset storage via group coordinator + elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2): + self._coordinator.ensure_coordinator_known() + + # fetch offsets for any subscribed partitions that we arent tracking yet + if not self._subscription.has_all_fetch_positions(): + partitions = self._subscription.missing_fetch_positions() + self._update_fetch_positions(partitions) + + poll_ms = 1000 * (self._consumer_timeout - time.time()) + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + self._client.poll(timeout_ms=poll_ms, sleep=True) + + # We need to make sure we at least keep up with scheduled tasks, + # like heartbeats, auto-commits, and metadata refreshes + timeout_at = self._next_timeout() + + # Because the consumer client poll does not sleep unless blocking on + # network IO, we need to explicitly sleep when we know we are idle + # because we haven't been assigned any partitions to fetch / consume + if self._use_consumer_group() and not self.assignment(): + sleep_time = max(timeout_at - time.time(), 0) + if sleep_time > 0 and not self._client.in_flight_request_count(): + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + # Short-circuit the fetch iterator if we are already timed out + # to avoid any unintentional interaction with fetcher setup + if time.time() > timeout_at: + continue + + for msg in self._fetcher: + yield msg + if time.time() > timeout_at: + log.debug("internal iterator timeout - breaking for poll") + break + + # an else block on a for loop only executes if there was no break + # so this should only be called on a StopIteration from the fetcher + # and we assume that it is safe to init_fetches when fetcher is done + # i.e., there are no more records stored internally + else: + self._fetcher.send_fetches() + + def _next_timeout(self): + timeout = min(self._consumer_timeout, + self._client._delayed_tasks.next_at() + time.time(), + self._client.cluster.ttl() / 1000.0 + time.time()) + + # Although the delayed_tasks timeout above should cover processing + # HeartbeatRequests, it is still possible that HeartbeatResponses + # are left unprocessed during a long _fetcher iteration without + # an intermediate poll(). And because tasks are responsible for + # rescheduling themselves, an unprocessed response will prevent + # the next heartbeat from being sent. This check should help + # avoid that. + if self._use_consumer_group(): + heartbeat = time.time() + self._coordinator.heartbeat.ttl() + timeout = min(timeout, heartbeat) + return timeout + def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): - ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1) - if not ret: - raise StopIteration - assert len(ret) == 1 - (messages,) = ret.values() - assert len(messages) == 1 - return messages[0] + if not self._iterator: + self._iterator = self._message_generator() + + self._set_consumer_timeout() + try: + return next(self._iterator) + except StopIteration: + self._iterator = None + raise + + def _set_consumer_timeout(self): + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) # old KafkaConsumer methods are deprecated def configure(self, **configs): |