summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 22:53:35 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 23:00:10 -0800
commit458bdb50f62a0fa2556bca11cf6cc68c6e935ca6 (patch)
tree3b3f1c0334c6f09d69f03b0b21dc49406da8f9a9 /kafka/consumer/group.py
parent76e7d13bdd736aa23507a336d04ec025636f9404 (diff)
downloadkafka-python-458bdb50f62a0fa2556bca11cf6cc68c6e935ca6.tar.gz
Reorganize init_fetches calls during iteration
Generally should not init_fetches while the generator has pending messages; this revision adds an explicit check / noop to the public interface, and uses a private method internally to attempt to pipeline fetch requests.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py5
1 files changed, 3 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d83c452..bd977c5 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- # init any new fetches (won't resend pending fetches)
- self._fetcher.init_fetches()
self._client.poll(
max(0, self._consumer_timeout - time.time()) * 1000)
@@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+ else:
+ self._fetcher.init_fetches()
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator):
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
+ self._fetcher.init_fetches()
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0: