diff options
-rw-r--r-- | kafka/transaction.py | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/kafka/transaction.py b/kafka/transaction.py index 10c2ebd..0dfe9d4 100644 --- a/kafka/transaction.py +++ b/kafka/transaction.py @@ -20,13 +20,14 @@ class KafkaTransaction(object): consumer = SimpleConsumer(client, group, topic, auto_commit=False) consumer.provide_partition_info() + consumer.fetch_last_known_offsets() while some_condition: with KafkaTransaction(consumer) as transaction: messages = consumer.get_messages(count, block=False) for partition, message in messages: - if can_process(message.value): + if can_process(message): transaction.mark(partition, message.offset) else: break |