diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 75 |
1 files changed, 56 insertions, 19 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index eb85060..f116bed 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -36,6 +36,7 @@ class Fetcher(six.Iterator): 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, 'check_crcs': True, + 'iterator_refetch_records': 1, # undocumented -- interface may change } def __init__(self, client, subscriptions, **configs): @@ -80,15 +81,28 @@ class Fetcher(six.Iterator): self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None + self._fetch_futures = collections.deque() #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) def init_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. + Note: noop if there are unconsumed records internal to the fetcher + Returns: List of Futures: each future resolves to a FetchResponse """ + # We need to be careful when creating fetch records during iteration + # so we verify that there are no records in the deque, or in an + # iterator + if self._records or self._iterator: + log.debug('Skipping init_fetches because there are unconsumed' + ' records internally') + return [] + return self._init_fetches() + + def _init_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -97,8 +111,23 @@ class Fetcher(six.Iterator): future.add_callback(self._handle_fetch_response, request) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) + self._fetch_futures.extend(futures) + self._clean_done_fetch_futures() return futures + def _clean_done_fetch_futures(self): + while True: + if not self._fetch_futures: + break + if not self._fetch_futures[0].is_done: + break + self._fetch_futures.popleft() + + def in_flight_fetches(self): + """Return True if there are any unprocessed FetchRequests in flight.""" + self._clean_done_fetch_futures() + return bool(self._fetch_futures) + def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. @@ -301,9 +330,9 @@ class Fetcher(six.Iterator): elif fetch_offset == position: next_offset = messages[-1][0] + 1 - log.debug("Returning fetched records at offset %d for assigned" - " partition %s and update position to %s", position, - tp, next_offset) + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s and update position to %s", position, + tp, next_offset) self._subscriptions.assignment[tp].position = next_offset for record in self._unpack_message_set(tp, messages): @@ -339,13 +368,18 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() + # Send additional FetchRequests when the internal queue is low + # this should enable moderate pipelining + if len(self._records) <= self.config['iterator_refetch_records']: + self._init_fetches() + (fetch_offset, tp, messages) = self._records.popleft() if not self._subscriptions.is_assigned(tp): # this can happen when a rebalance happened before # fetched records are returned - log.warning("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) continue # note that the consumed position should always be available @@ -354,23 +388,29 @@ class Fetcher(six.Iterator): if not self._subscriptions.is_fetchable(tp): # this can happen when a partition consumption paused before # fetched records are returned - log.warning("Not returning fetched records for assigned partition" - " %s since it is no longer fetchable", tp) + log.debug("Not returning fetched records for assigned partition" + " %s since it is no longer fetchable", tp) elif fetch_offset == position: + log.log(0, "Returning fetched records at offset %d for assigned" + " partition %s", position, tp) for msg in self._unpack_message_set(tp, messages): + + # Because we are in a generator, it is possible for + # assignment to change between yield calls + # so we need to re-check on each loop + if not self._subscriptions.is_assigned(tp): + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + break + self._subscriptions.assignment[tp].position = msg.offset + 1 yield msg else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request - log.warning("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) - - # Send any additional FetchRequests that we can now - # this will likely fetch each partition individually, rather than - # fetch multiple partitions in bulk when they are on the same broker - self.init_fetches() + log.debug("Ignoring fetched records for %s at offset %s", + tp, fetch_offset) def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -469,8 +509,7 @@ class Fetcher(six.Iterator): def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. - FetchRequests skipped if no leader, node has requests in flight, or we - have not returned all previously fetched records to consumer + FetchRequests skipped if no leader, or node has requests in flight Returns: dict: {node_id: [FetchRequest,...]} @@ -486,9 +525,7 @@ class Fetcher(six.Iterator): " Requesting metadata update", partition) self._client.cluster.request_update() elif self._client.in_flight_request_count(node_id) == 0: - # if there is a leader and no in-flight requests, - # issue a new fetch but only fetch data for partitions whose - # previously fetched data has been consumed + # fetch if there is a leader and no in-flight requests position = self._subscriptions.assignment[partition].position partition_info = ( partition.partition, |