summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 22:48:09 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 23:00:09 -0800
commit76e7d13bdd736aa23507a336d04ec025636f9404 (patch)
tree8dbc05af163697f37b2299292061317acfb96941 /kafka
parent0adb71af3858a8f4d1cfb3fe072989499b3b3c4f (diff)
downloadkafka-python-76e7d13bdd736aa23507a336d04ec025636f9404.tar.gz
Check for assignment changes before yielding new record
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py9
1 files changed, 9 insertions, 0 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 6446f4a..91d3711 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -361,6 +361,15 @@ class Fetcher(six.Iterator):
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)
for msg in self._unpack_message_set(tp, messages):
+
+ # Because we are in a generator, it is possible for
+ # assignment to change between yield calls
+ # so we need to re-check on each loop
+ if not self._subscriptions.is_assigned(tp):
+ log.debug("Not returning fetched records for partition %s"
+ " since it is no longer assigned", tp)
+ break
+
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else: