diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-09 16:52:01 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-09 16:52:01 -0800 |
commit | a94d8fa1cb670b65a4815a05cda6f774f555c632 (patch) | |
tree | 7676637b69d9346519d5c0a1465dc63a2e6fd15e /kafka/consumer/group.py | |
parent | cc22d1bab82fd234f2a47d347152a321aaa0b53e (diff) | |
download | kafka-python-kafka-2978.tar.gz |
KAFKA-2978: consumer stops fetching when consumed and fetch positions get out of synckafka-2978
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9ce1438..4930ba1 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -452,10 +452,10 @@ class KafkaConsumer(six.Iterator): """ assert self._subscription.is_assigned(partition) - offset = self._subscription.assignment[partition].consumed + offset = self._subscription.assignment[partition].position if offset is None: self._update_fetch_positions(partition) - offset = self._subscription.assignment[partition].consumed + offset = self._subscription.assignment[partition].position return offset def pause(self, *partitions): |