diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-12 14:38:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-12 14:38:16 -0800 |
commit | e58b447b8e9a7eaa307244b7a315c19ac00381a0 (patch) | |
tree | 581b2fcb1f7f2ec06800a7ef07ef2893c23c9450 /kafka | |
parent | 1c4a8bfc671282c002e39fd67afd5f4ccef0ee4c (diff) | |
download | kafka-python-e58b447b8e9a7eaa307244b7a315c19ac00381a0.tar.gz |
Use private deque to track in-flight fetchrequests
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index fe29e77..6cafb65 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -80,6 +80,7 @@ class Fetcher(six.Iterator): self._offset_out_of_range_partitions = dict() # {topic_partition: offset} self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None + self._fetch_futures = collections.deque() #self.sensors = FetchManagerMetrics(metrics, metric_group_prefix) @@ -109,8 +110,23 @@ class Fetcher(six.Iterator): future.add_callback(self._handle_fetch_response, request) future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id) futures.append(future) + self._fetch_futures.extend(futures) + self._clean_done_fetch_futures() return futures + def _clean_done_fetch_futures(self): + while True: + if not self._fetch_futures: + break + if not self._fetch_futures[0].is_done: + break + self._fetch_futures.popleft() + + def in_flight_fetches(self): + """Return True if there are any unprocessed FetchRequests in flight.""" + self._clean_done_fetch_futures() + return bool(self._fetch_futures) + def update_fetch_positions(self, partitions): """Update the fetch positions for the provided partitions. |