diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-10 22:53:35 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-10 23:00:10 -0800 |
commit | 458bdb50f62a0fa2556bca11cf6cc68c6e935ca6 (patch) | |
tree | 3b3f1c0334c6f09d69f03b0b21dc49406da8f9a9 /kafka/consumer/group.py | |
parent | 76e7d13bdd736aa23507a336d04ec025636f9404 (diff) | |
download | kafka-python-458bdb50f62a0fa2556bca11cf6cc68c6e935ca6.tar.gz |
Reorganize init_fetches calls during iteration
Generally should not init_fetches while the generator has pending
messages; this revision adds an explicit check / noop to the
public interface, and uses a private method internally to
attempt to pipeline fetch requests.
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d83c452..bd977c5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - # init any new fetches (won't resend pending fetches) - self._fetcher.init_fetches() self._client.poll( max(0, self._consumer_timeout - time.time()) * 1000) @@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + else: + self._fetcher.init_fetches() def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() + self._fetcher.init_fetches() # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: |