diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 22 | ||||
-rw-r--r-- | kafka/consumer/group.py | 5 |
2 files changed, 20 insertions, 7 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 91d3711..c7d567e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -86,9 +86,21 @@ class Fetcher(six.Iterator): def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. + Note: noop if there are unconsumed records internal to the fetcher + Returns: List of Futures: each future resolves to a FetchResponse """ + # We need to be careful when creating fetch records during iteration + # so we verify that there are no records in the deque, or in an + # iterator + if self._records or self._iterator: + log.debug('Skipping init_fetches because there are unconsumed' + ' records internally') + return [] + return self._init_fetches() + + def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -339,6 +351,11 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) == 1: + self._init_fetches() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): @@ -378,11 +395,6 @@ class Fetcher(six.Iterator): log.debug("Ignoring fetched records for %s at offset %s", tp, fetch_offset) - # Send any additional FetchRequests that we can now - # this will likely fetch each partition individually, rather than - # fetch multiple partitions in bulk when they are on the same broker - self.init_fetches() - def __iter__(self): # pylint: disable=non-iterator-returned return self diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d83c452..bd977c5 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -626,8 +626,6 @@ class KafkaConsumer(six.Iterator): partitions = self._subscription.missing_fetch_positions() self._update_fetch_positions(partitions) - # init any new fetches (won't resend pending fetches) - self._fetcher.init_fetches() self._client.poll( max(0, self._consumer_timeout - time.time()) * 1000) @@ -641,6 +639,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + else: + self._fetcher.init_fetches() def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -648,6 +648,7 @@ class KafkaConsumer(six.Iterator): def __next__(self): if not self._iterator: self._iterator = self._message_generator() + self._fetcher.init_fetches() # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: |