From 458bdb50f62a0fa2556bca11cf6cc68c6e935ca6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 10 Jan 2016 22:53:35 -0800 Subject: Reorganize init_fetches calls during iteration Generally should not init_fetches while the generator has pending messages; this revision adds an explicit check / noop to the public interface, and uses a private method internally to attempt to pipeline fetch requests. --- kafka/consumer/fetcher.py | 22 +++++++++++++++++----- kafka/consumer/group.py | 5 +++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 91d3711..c7d567e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -86,9 +86,21 @@ class Fetcher(six.Iterator): def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. + Note: noop if there are unconsumed records internal to the fetcher + Returns: List of Futures: each future resolves to a FetchResponse """ + # We need to be careful when creating fetch records during iteration + # so we verify that there are no records in the deque, or in an + # iterator + if self._records or self._iterator: + log.debug('Skipping init_fetches because there are unconsumed' + ' records internally') + return [] + return self._init_fetches() + + def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -339,6 +351,11 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) == 1: + self._init_fetches() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -378,11 +395,6 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s at offset %s", tp, fetch_offset) - # Send any additional FetchRequests that we can now - # this will likely fetch each partition individually, rather than - # fetch multiple partitions in bulk when they are on the same broker - self.init_fetches() - def __iter__(self): # pylint: disable=non-iterator-returned return self diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d83c452..bd977c5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - # init any new fetches (won't resend pending fetches) - self._fetcher.init_fetches() self._client.poll( max(0, self._consumer_timeout - time.time()) * 1000) @@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + else: + self._fetcher.init_fetches() def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() + self._fetcher.init_fetches() # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: -- cgit v1.2.1