summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/transaction.py3
1 files changed, 2 insertions, 1 deletions
diff --git a/kafka/transaction.py b/kafka/transaction.py
index 10c2ebd..0dfe9d4 100644
--- a/kafka/transaction.py
+++ b/kafka/transaction.py
@@ -20,13 +20,14 @@ class KafkaTransaction(object):
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
consumer.provide_partition_info()
+ consumer.fetch_last_known_offsets()
while some_condition:
with KafkaTransaction(consumer) as transaction:
messages = consumer.get_messages(count, block=False)
for partition, message in messages:
- if can_process(message.value):
+ if can_process(message):
transaction.mark(partition, message.offset)
else:
break