summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py7
1 files changed, 7 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c0906ad..ca66e87 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -365,6 +365,13 @@ class SimpleConsumer(Consumer):
next_offset = None
for message in resp.messages:
next_offset = message.offset
+
+ # update the offset before the message is yielded. This is
+ # so that the consumer state is not lost in certain cases.
+ # For eg: the message is yielded and consumed by the caller,
+ # but the caller does not come back into the generator again.
+ # The message will be consumed but the status will not be
+ # updated in the consumer
self.offsets[partition] = message.offset
yield message
if next_offset is None: