summaryrefslogtreecommitdiff
path: root/kafka/consumer/fetcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r--kafka/consumer/fetcher.py22
1 files changed, 17 insertions, 5 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 91d3711..c7d567e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -86,9 +86,21 @@ class Fetcher(six.Iterator):
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
+ Note: noop if there are unconsumed records internal to the fetcher
+
Returns:
List of Futures: each future resolves to a FetchResponse
"""
+ # We need to be careful when creating fetch records during iteration
+ # so we verify that there are no records in the deque, or in an
+ # iterator
+ if self._records or self._iterator:
+ log.debug('Skipping init_fetches because there are unconsumed'
+ ' records internally')
+ return []
+ return self._init_fetches()
+
+ def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -339,6 +351,11 @@ class Fetcher(six.Iterator):
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
+ # Send additional FetchRequests when the internal queue is low
+ # this should enable moderate pipelining
+ if len(self._records) == 1:
+ self._init_fetches()
+
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
@@ -378,11 +395,6 @@ class Fetcher(six.Iterator):
log.debug("Ignoring fetched records for %s at offset %s",
tp, fetch_offset)
- # Send any additional FetchRequests that we can now
- # this will likely fetch each partition individually, rather than
- # fetch multiple partitions in bulk when they are on the same broker
- self.init_fetches()
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self