summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py5
1 files changed, 3 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d83c452..bd977c5 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- # init any new fetches (won't resend pending fetches)
- self._fetcher.init_fetches()
self._client.poll(
max(0, self._consumer_timeout - time.time()) * 1000)
@@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+ else:
+ self._fetcher.init_fetches()
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator):
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
+ self._fetcher.init_fetches()
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0: