summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py9
1 files changed, 9 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 6446f4a..91d3711 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -361,6 +361,15 @@ class Fetcher(six.Iterator):
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)
for msg in self._unpack_message_set(tp, messages):
+
+ # Because we are in a generator, it is possible for
+ # assignment to change between yield calls
+ # so we need to re-check on each loop
+ if not self._subscriptions.is_assigned(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ break
+
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else: