summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py18
1 files changed, 15 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 637ef93..9db4b5d 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -752,9 +752,21 @@ class KafkaConsumer(six.Iterator):
self._fetcher.init_fetches()
def _next_timeout(self):
- return min(self._consumer_timeout,
- self._client._delayed_tasks.next_at() + time.time(),
- self._client.cluster.ttl() / 1000.0 + time.time())
+ timeout = min(self._consumer_timeout,
+ self._client._delayed_tasks.next_at() + time.time(),
+ self._client.cluster.ttl() / 1000.0 + time.time())
+
+ # Although the delayed_tasks timeout above should cover processing
+ # HeartbeatRequests, it is still possible that HeartbeatResponses
+ # are left unprocessed during a long _fetcher iteration without
+ # an intermediate poll(). And because tasks are responsible for
+ # rescheduling themselves, an unprocessed response will prevent
+ # the next heartbeat from being sent. This check should help
+ # avoid that.
+ if self._use_consumer_group():
+ heartbeat = time.time() + self._coordinator.heartbeat.ttl()
+ timeout = min(timeout, heartbeat)
+ return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self