summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py22
-rw-r--r--kafka/consumer/group.py5
2 files changed, 20 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 91d3711..c7d567e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -86,9 +86,21 @@ class Fetcher(six.Iterator):
def init_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
+ Note: noop if there are unconsumed records internal to the fetcher
+
Returns:
List of Futures: each future resolves to a FetchResponse
"""
+ # We need to be careful when creating fetch records during iteration
+ # so we verify that there are no records in the deque, or in an
+ # iterator
+ if self._records or self._iterator:
+ log.debug('Skipping init_fetches because there are unconsumed'
+ ' records internally')
+ return []
+ return self._init_fetches()
+
+ def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -339,6 +351,11 @@ class Fetcher(six.Iterator):
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
+ # Send additional FetchRequests when the internal queue is low
+ # this should enable moderate pipelining
+ if len(self._records) == 1:
+ self._init_fetches()
+
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
@@ -378,11 +395,6 @@ class Fetcher(six.Iterator):
log.debug("Ignoring fetched records for %s at offset %s",
tp, fetch_offset)
- # Send any additional FetchRequests that we can now
- # this will likely fetch each partition individually, rather than
- # fetch multiple partitions in bulk when they are on the same broker
- self.init_fetches()
-
def __iter__(self): # pylint: disable=non-iterator-returned
return self
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index d83c452..bd977c5 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator):
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
- # init any new fetches (won't resend pending fetches)
- self._fetcher.init_fetches()
self._client.poll(
max(0, self._consumer_timeout - time.time()) * 1000)
@@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator):
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
+ else:
+ self._fetcher.init_fetches()
def __iter__(self): # pylint: disable=non-iterator-returned
return self
@@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator):
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
+ self._fetcher.init_fetches()
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0: