From bc573e3d63a687903a9be2e1b3da2f943a7208e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Oct 2017 10:50:06 -0700 Subject: More tests --- kafka/consumer/fetcher.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'kafka') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d059a10..c4fa546 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -400,6 +400,11 @@ class Fetcher(six.Iterator): tp = self._next_partition_records.topic_partition + # We can ignore any prior signal to drop pending message sets + # because we are starting from a fresh one where fetch_offset == position + # i.e., the user seek()'d to this position + self._subscriptions.assignment[tp].drop_pending_message_set = False + for msg in self._next_partition_records.take(): # Because we are in a generator, it is possible for -- cgit v1.2.1