diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 22 |
1 files changed, 17 insertions, 5 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 91d3711..c7d567e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -86,9 +86,21 @@ class Fetcher(six.Iterator): def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. + Note: noop if there are unconsumed records internal to the fetcher + Returns: List of Futures: each future resolves to a FetchResponse """ + # We need to be careful when creating fetch records during iteration + # so we verify that there are no records in the deque, or in an + # iterator + if self._records or self._iterator: + log.debug('Skipping init_fetches because there are unconsumed' + ' records internally') + return [] + return self._init_fetches() + + def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -339,6 +351,11 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) == 1: + self._init_fetches() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -378,11 +395,6 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s at offset %s", tp, fetch_offset) - # Send any additional FetchRequests that we can now - # this will likely fetch each partition individually, rather than - # fetch multiple partitions in bulk when they are on the same broker - self.init_fetches() - def __iter__(self): # pylint: disable=non-iterator-returned return self |