diff options
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fe29e77..6cafb65 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -80,6 +80,7 @@ class Fetcher(six.Iterator): self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None + self._fetch_futures = collections.deque() #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) @@ -109,8 +110,23 @@ class Fetcher(six.Iterator): future.add_callback(self._handle_fetch_response, request) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) + self._fetch_futures.extend(futures) + self._clean_done_fetch_futures() return futures + def _clean_done_fetch_futures(self): + while True: + if not self._fetch_futures: + break + if not self._fetch_futures[0].is_done: + break + self._fetch_futures.popleft() + + def in_flight_fetches(self): + """Return True if there are any unprocessed FetchRequests in flight.""" + self._clean_done_fetch_futures() + return bool(self._fetch_futures) + def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. |