diff options
-rw-r--r-- | kafka/consumer/fetcher.py | 7 |
1 files changed, 2 insertions, 5 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c7d567e..fe29e77 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -492,8 +492,7 @@ class Fetcher(six.Iterator): def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. - FetchRequests skipped if no leader, node has requests in flight, or we - have not returned all previously fetched records to consumer + FetchRequests skipped if no leader, or node has requests in flight Returns: dict: {node_id: [FetchRequest,...]} @@ -509,9 +508,7 @@ class Fetcher(six.Iterator): " Requesting metadata update", partition) self._client.cluster.request_update() elif self._client.in_flight_request_count(node_id) == 0: - # if there is a leader and no in-flight requests, - # issue a new fetch but only fetch data for partitions whose - # previously fetched data has been consumed + # fetch if there is a leader and no in-flight requests position = self._subscriptions.assignment[partition].position partition_info = ( partition.partition, |