summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-22 00:27:32 -0700
committerDana Powers <dana.powers@gmail.com>2017-12-21 11:39:25 -0800
commit4209b881a22fbcf6453e01037c0596c7da21091b (patch)
tree6743d496abf6b0fa18d1b1cae704590a758bc614
parent1247a6fb087079ab5dc09350c27563db4823cef5 (diff)
downloadkafka-python-4209b881a22fbcf6453e01037c0596c7da21091b.tar.gz
Fix consumer iterator internal timeout to just check when coordinator needs next poll
-rw-r--r--kafka/consumer/group.py14
1 files changed, 2 insertions, 12 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index e080251..1de8aca 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1075,18 +1075,8 @@ class KafkaConsumer(six.Iterator):
def _next_timeout(self):
timeout = min(self._consumer_timeout,
- self._client.cluster.ttl() / 1000.0 + time.time())
-
- # Although the delayed_tasks timeout above should cover processing
- # HeartbeatRequests, it is still possible that HeartbeatResponses
- # are left unprocessed during a long _fetcher iteration without
- # an intermediate poll(). And because tasks are responsible for
- # rescheduling themselves, an unprocessed response will prevent
- # the next heartbeat from being sent. This check should help
- # avoid that.
- if self._use_consumer_group():
- heartbeat = time.time() + self._coordinator.heartbeat.time_to_next_heartbeat()
- timeout = min(timeout, heartbeat)
+ self._client.cluster.ttl() / 1000.0 + time.time(),
+ self._coordinator.time_to_next_poll() + time.time())
return timeout
def __iter__(self): # pylint: disable=non-iterator-returned