summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-12 14:38:16 -0800
committerDana Powers <dana.powers@rd.io>2016-01-12 14:38:16 -0800
commite58b447b8e9a7eaa307244b7a315c19ac00381a0 (patch)
tree581b2fcb1f7f2ec06800a7ef07ef2893c23c9450 /kafka
parent1c4a8bfc671282c002e39fd67afd5f4ccef0ee4c (diff)
downloadkafka-python-e58b447b8e9a7eaa307244b7a315c19ac00381a0.tar.gz
Use private deque to track in-flight fetchrequests
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py16
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index fe29e77..6cafb65 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -80,6 +80,7 @@ class Fetcher(six.Iterator):
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
+ self._fetch_futures = collections.deque()
#self.sensors = FetchManagerMetrics(metrics, metric_group_prefix)
@@ -109,8 +110,23 @@ class Fetcher(six.Iterator):
future.add_callback(self._handle_fetch_response, request)
future.add_errback(log.error, 'Fetch to node %s failed: %s', node_id)
futures.append(future)
+ self._fetch_futures.extend(futures)
+ self._clean_done_fetch_futures()
return futures
+ def _clean_done_fetch_futures(self):
+ while True:
+ if not self._fetch_futures:
+ break
+ if not self._fetch_futures[0].is_done:
+ break
+ self._fetch_futures.popleft()
+
+ def in_flight_fetches(self):
+ """Return True if there are any unprocessed FetchRequests in flight."""
+ self._clean_done_fetch_futures()
+ return bool(self._fetch_futures)
+
def update_fetch_positions(self, partitions):
"""Update the fetch positions for the provided partitions.