diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-03-12 12:16:05 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-03-12 18:21:32 -0800 |
commit | 561a678d1de1604262be43d47919fa68bdf17b17 (patch) | |
tree | 9cef94dc5ff8315dcbb970f0eaf09e98f374f121 /kafka/consumer/group.py | |
parent | fb0b49827ff78bebd0a84c86d890394b00795bcf (diff) | |
download | kafka-python-consumer_heartbeat_fixes.tar.gz |
Consumer should timeout internal iterator if heartbeat ttl is expiredconsumer_heartbeat_fixes
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 18 |
1 files changed, 15 insertions, 3 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 637ef93..9db4b5d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -752,9 +752,21 @@ class KafkaConsumer(six.Iterator): self._fetcher.init_fetches() def _next_timeout(self): - return min(self._consumer_timeout, - self._client._delayed_tasks.next_at() + time.time(), - self._client.cluster.ttl() / 1000.0 + time.time()) + timeout = min(self._consumer_timeout, + self._client._delayed_tasks.next_at() + time.time(), + self._client.cluster.ttl() / 1000.0 + time.time()) + + # Although the delayed_tasks timeout above should cover processing + # HeartbeatRequests, it is still possible that HeartbeatResponses + # are left unprocessed during a long _fetcher iteration without + # an intermediate poll(). And because tasks are responsible for + # rescheduling themselves, an unprocessed response will prevent + # the next heartbeat from being sent. This check should help + # avoid that. + if self._use_consumer_group(): + heartbeat = time.time() + self._coordinator.heartbeat.ttl() + timeout = min(timeout, heartbeat) + return timeout def __iter__(self): # pylint: disable=non-iterator-returned return self |