diff options
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 45 |
1 files changed, 35 insertions, 10 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 704c994..333ef64 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator): self._fetcher.update_fetch_positions(partitions) def _message_generator(self): + assert self.assignment() or self.subscription() is not None while time.time() < self._consumer_timeout: if self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() @@ -626,21 +627,43 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - # init any new fetches (won't resend pending fetches) - self._fetcher.init_fetches() - self._client.poll( - max(0, self._consumer_timeout - time.time()) * 1000) - + # We need to make sure we at least keep up with scheduled tasks, + # like heartbeats, auto-commits, and metadata refreshes timeout_at = min(self._consumer_timeout, self._client._delayed_tasks.next_at() + time.time(), self._client.cluster.ttl() / 1000.0 + time.time()) + + if self.config['api_version'] >= (0, 9): + if not self.assignment(): + sleep_time = time.time() - timeout_at + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + poll_ms = 1000 * (time.time() - self._consumer_timeout) + + # Dont bother blocking if there are no fetches + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + + self._client.poll(poll_ms) + if time.time() > timeout_at: continue + for msg in self._fetcher: yield msg if time.time() > timeout_at: + log.debug("internal iterator timeout - breaking for poll") break + # an else block on a for loop only executes if there was no break + # so this should only be called on a StopIteration from the fetcher + # and we assume that it is safe to init_fetches when fetcher is done + # i.e., there are no more records stored internally + else: + self._fetcher.init_fetches() + def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -648,17 +671,19 @@ class KafkaConsumer(six.Iterator): if not self._iterator: self._iterator = self._message_generator() - # consumer_timeout_ms can be used to stop iteration early - if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( - self.config['consumer_timeout_ms'] / 1000.0) - + self._set_consumer_timeout() try: return next(self._iterator) except StopIteration: self._iterator = None raise + def _set_consumer_timeout(self): + # consumer_timeout_ms can be used to stop iteration early + if self.config['consumer_timeout_ms'] >= 0: + self._consumer_timeout = time.time() + ( + self.config['consumer_timeout_ms'] / 1000.0) + # old KafkaConsumer methods are deprecated def configure(self, **configs): raise NotImplementedError( |