diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c0906ad..ca66e87 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -365,6 +365,13 @@ class SimpleConsumer(Consumer): next_offset = None for message in resp.messages: next_offset = message.offset + + # update the offset before the message is yielded. This is + # so that the consumer state is not lost in certain cases. + # For eg: the message is yielded and consumed by the caller, + # but the caller does not come back into the generator again. + # The message will be consumed but the status will not be + # updated in the consumer self.offsets[partition] = message.offset yield message if next_offset is None: |