diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-12 14:46:02 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-12 15:39:48 -0800 |
commit | 3e622068ea7a970c8674a518a05355b6065560f1 (patch) | |
tree | 3b1cd9fc622c2cd424d55a2a3bb704d909a380e8 /kafka/consumer/group.py | |
parent | 22e84a57cb0a33aef3b37ed0515a85244d3a1615 (diff) | |
download | kafka-python-iterator_fetches.tar.gz |
Sleep in KafkaConsumer iterator if no partition assignment; dont block in poll if no in-flight fetchesiterator_fetches
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 29 |
1 files changed, 25 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 141c1fa..333ef64 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -611,6 +611,7 @@ class KafkaConsumer(six.Iterator): self._fetcher.update_fetch_positions(partitions) def _message_generator(self): + assert self.assignment() or self.subscription() is not None while time.time() < self._consumer_timeout: if self.config['api_version'] >= (0, 8, 2): self._coordinator.ensure_coordinator_known() @@ -626,19 +627,40 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - self._client.poll( - max(0, self._consumer_timeout - time.time()) * 1000) - + # We need to make sure we at least keep up with scheduled tasks, + # like heartbeats, auto-commits, and metadata refreshes timeout_at = min(self._consumer_timeout, self._client._delayed_tasks.next_at() + time.time(), self._client.cluster.ttl() / 1000.0 + time.time()) + + if self.config['api_version'] >= (0, 9): + if not self.assignment(): + sleep_time = time.time() - timeout_at + log.debug('No partitions assigned; sleeping for %s', sleep_time) + time.sleep(sleep_time) + continue + + poll_ms = 1000 * (time.time() - self._consumer_timeout) + + # Dont bother blocking if there are no fetches + if not self._fetcher.in_flight_fetches(): + poll_ms = 0 + + self._client.poll(poll_ms) + if time.time() > timeout_at: continue + for msg in self._fetcher: yield msg if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + + # an else block on a for loop only executes if there was no break + # so this should only be called on a StopIteration from the fetcher + # and we assume that it is safe to init_fetches when fetcher is done + # i.e., there are no more records stored internally else: self._fetcher.init_fetches() @@ -648,7 +670,6 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() - self._fetcher.init_fetches() self._set_consumer_timeout() try: |