diff options
| -rw-r--r-- | kafka/consumer/group.py | 31 | 
1 files changed, 16 insertions, 15 deletions
| diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 0e03544..f2991b2 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -635,26 +635,22 @@ class KafkaConsumer(six.Iterator):                  partitions = self._subscription.missing_fetch_positions()                  self._update_fetch_positions(partitions) +            poll_ms = 1000 * (self._consumer_timeout - time.time()) +            if not self._fetcher.in_flight_fetches(): +                poll_ms = 0 +            self._client.poll(poll_ms) +              # We need to make sure we at least keep up with scheduled tasks,              # like heartbeats, auto-commits, and metadata refreshes -            timeout_at = min(self._consumer_timeout, -                             self._client._delayed_tasks.next_at() + time.time(), -                             self._client.cluster.ttl() / 1000.0 + time.time()) +            timeout_at = self._next_timeout()              if self.config['api_version'] >= (0, 9):                  if self.config['group_id'] is not None and not self.assignment(): -                    sleep_time = time.time() - timeout_at -                    log.debug('No partitions assigned; sleeping for %s', sleep_time) -                    time.sleep(sleep_time) -                    continue - -            poll_ms = 1000 * (time.time() - self._consumer_timeout) - -            # Dont bother blocking if there are no fetches -            if not self._fetcher.in_flight_fetches(): -                poll_ms = 0 - -            self._client.poll(poll_ms) +                    sleep_time = max(timeout_at - time.time(), 0) +                    if sleep_time > 0 and not self._client.in_flight_request_count(): +                        log.debug('No partitions assigned; sleeping for %s', sleep_time) +                        time.sleep(sleep_time) +                        continue              if time.time() > timeout_at:                  continue @@ -672,6 +668,11 @@ class KafkaConsumer(six.Iterator):              else:                  self._fetcher.init_fetches() +    def _next_timeout(self): +        return min(self._consumer_timeout, +                   self._client._delayed_tasks.next_at() + time.time(), +                   self._client.cluster.ttl() / 1000.0 + time.time()) +      def __iter__(self):  # pylint: disable=non-iterator-returned          return self | 
