diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-09-17 18:33:37 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-09-19 21:07:14 -0700 |
commit | b05c8da2626bd639455f1ab3c25b84c59ad43690 (patch) | |
tree | 6f6b7ec6f8f4272d8e366267bd9ebeb0797c0f4e | |
parent | e70c7b271b634e61a46ae3da54b50b8a29c24131 (diff) | |
download | kafka-python-b05c8da2626bd639455f1ab3c25b84c59ad43690.tar.gz |
more partial
-rw-r--r-- | kafka/consumer/group.py | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8060910..5eeaddd 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -515,11 +515,6 @@ class KafkaConsumer(six.Iterator): while True: records = self._poll_once(remaining, max_records) if records: - # before returning the fetched records, we can send off the - # next round of fetches and avoid block waiting for their - # responses to enable pipelining while the user is handling the - # fetched records. - self._fetcher.send_fetches() return records elapsed_ms = (time.time() - start) * 1000 @@ -556,6 +551,12 @@ class KafkaConsumer(six.Iterator): # poll() call to commit, then just return it immediately records, partial = self._fetcher.fetched_records(max_records) if records: + # before returning the fetched records, we can send off the + # next round of fetches and avoid block waiting for their + # responses to enable pipelining while the user is handling the + # fetched records. + if not partial: + self._fetcher.send_fetches() return records # send any new fetches (won't resend pending fetches) |