summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-31 23:42:04 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-31 23:42:04 -0800
commit33b9ff2e339db34684ba0189e1f4e5865bc4ed9f (patch)
tree5d96bf817bb303c6829f37633fc5c36f19e485b7
parent130155b874b6d1f629e6441068f7e7ef588a8779 (diff)
downloadkafka-python-33b9ff2e339db34684ba0189e1f4e5865bc4ed9f.tar.gz
Fetcher iterator should check for pause and seek resets
-rw-r--r--kafka/consumer/fetcher.py8
1 files changed, 5 insertions, 3 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index e2bc892..41f53aa 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -400,11 +400,13 @@ class Fetcher(six.Iterator):
for msg in self._unpack_message_set(tp, messages):
# Because we are in a generator, it is possible for
- # assignment to change between yield calls
+ # subscription state to change between yield calls
# so we need to re-check on each loop
- if not self._subscriptions.is_assigned(tp):
+ # this should catch assignment changes, pauses
+ # and resets via seek_to_beginning / seek_to_end
+ if not self._subscriptions.is_fetchable(tp):
log.debug("Not returning fetched records for partition %s"
- " since it is no longer assigned", tp)
+ " since it is no longer fetchable", tp)
break
# Compressed messagesets may include earlier messages