diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-10 16:40:29 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-10 16:40:29 -0800 |
commit | ca08f759df60f4e7debaacc9b674e3191d5106bb (patch) | |
tree | cdc73d91abf37cee9771d7e5382a493525a3b694 /kafka/consumer/group.py | |
parent | 8ae2a3073134ff58f1314bf64165456a8e627b0a (diff) | |
parent | 98e4ab3cb931ee110faa22f6afa6c72523e24db9 (diff) | |
download | kafka-python-ca08f759df60f4e7debaacc9b674e3191d5106bb.tar.gz |
Merge pull request #502 from dpkp/task_poll_timeout
Add delayed task timeouts to _poll calls
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 4930ba1..75fe3ee 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -623,19 +623,19 @@ class KafkaConsumer(six.Iterator): # fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): - self._update_fetch_positions(self._subscription.missing_fetch_positions()) + partitions = self._subscription.missing_fetch_positions() + self._update_fetch_positions(partitions) # init any new fetches (won't resend pending fetches) self._fetcher.init_fetches() - self._client.poll(self.config['request_timeout_ms'] / 1000.0) - timeout = self._consumer_timeout - if self.config['api_version'] >= (0, 9): - heartbeat_timeout = time.time() + ( - self.config['heartbeat_interval_ms'] / 1000.0) - timeout = min(heartbeat_timeout, timeout) + self._client.poll() + + timeout_at = min(self._consumer_timeout, + self._client._delayed_tasks.next_at(), + self._client.cluster.ttl() / 1000.0 + time.time()) for msg in self._fetcher: yield msg - if time.time() > timeout: + if time.time() > timeout_at: break def __iter__(self): # pylint: disable=non-iterator-returned |