diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-06 11:47:41 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-06 11:47:41 -0700 |
commit | 78ad43600c469c05a5b0e32c6be27048749cd58e (patch) | |
tree | 8e5b4da7c07101eb96ff90f7fc8da38ddd25c34c /kafka/consumer/fetcher.py | |
parent | 3ef15f9d60af01ce397737b4d356618385b8884f (diff) | |
download | kafka-python-fetch.tar.gz |
Dont send FetchRequest for (obviously) pending datafetch
Diffstat (limited to 'kafka/consumer/fetcher.py')
-rw-r--r-- | kafka/consumer/fetcher.py | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 71d2ed2..4769c2e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -537,15 +537,24 @@ class Fetcher(six.Iterator): # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) + # avoid re-fetching pending offsets + pending = set() + for fetch_offset, tp, _ in self._records: + pending.add((tp, fetch_offset)) + for partition in self._subscriptions.fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + position = self._subscriptions.assignment[partition].position + + # fetch if there is a leader, no in-flight requests, and no _records if node_id is None or node_id == -1: log.debug("No leader found for partition %s." " Requesting metadata update", partition) self._client.cluster.request_update() - elif self._client.in_flight_request_count(node_id) == 0: - # fetch if there is a leader and no in-flight requests - position = self._subscriptions.assignment[partition].position + + elif ((partition, position) not in pending and + self._client.in_flight_request_count(node_id) == 0): + partition_info = ( partition.partition, position, |