summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-09-17 18:33:37 -0700
committerDana Powers <dana.powers@gmail.com>2016-09-19 21:07:14 -0700
commitb05c8da2626bd639455f1ab3c25b84c59ad43690 (patch)
tree6f6b7ec6f8f4272d8e366267bd9ebeb0797c0f4e
parente70c7b271b634e61a46ae3da54b50b8a29c24131 (diff)
downloadkafka-python-b05c8da2626bd639455f1ab3c25b84c59ad43690.tar.gz
more partial
-rw-r--r--kafka/consumer/group.py11
1 files changed, 6 insertions, 5 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 8060910..5eeaddd 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -515,11 +515,6 @@ class KafkaConsumer(six.Iterator):
while True:
records = self._poll_once(remaining, max_records)
if records:
- # before returning the fetched records, we can send off the
- # next round of fetches and avoid block waiting for their
- # responses to enable pipelining while the user is handling the
- # fetched records.
- self._fetcher.send_fetches()
return records
elapsed_ms = (time.time() - start) * 1000
@@ -556,6 +551,12 @@ class KafkaConsumer(six.Iterator):
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records)
if records:
+ # before returning the fetched records, we can send off the
+ # next round of fetches and avoid block waiting for their
+ # responses to enable pipelining while the user is handling the
+ # fetched records.
+ if not partial:
+ self._fetcher.send_fetches()
return records
# send any new fetches (won't resend pending fetches)