summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-07-01 10:33:50 +0530
committerMahendra M <mahendra.m@gmail.com>2013-07-01 10:33:50 +0530
commit1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c (patch)
tree85df9631f144ba94c979af52beed8e139f60489a /kafka/consumer.py
parentc54a2edbaec1c4442cc63c8d3f0874b5882e90bb (diff)
downloadkafka-python-1d278f0f60cb0a7b76fbc6b80c8e112a0deb2e0c.tar.gz
Fix minor bug in offset management
In the current patch get_messages(count=1) would return zero messages the first time it is invoked after a consumer was initialized.
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 6ceea72..4c64cf2 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,3 +1,4 @@
+from collections import defaultdict
from itertools import izip_longest, repeat
import logging
import time
@@ -218,6 +219,7 @@ class SimpleConsumer(Consumer):
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = FETCH_MIN_BYTES
+ self.fetch_started = defaultdict(bool) # defaults to false
super(SimpleConsumer, self).__init__(client, group, topic,
partitions=partitions,
@@ -348,7 +350,7 @@ class SimpleConsumer(Consumer):
# An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
# problematic, since 0 is offset of a message which we have not yet
# consumed.
- if offset != 0:
+ if self.fetch_started[partition]:
offset += 1
while True:
@@ -372,6 +374,7 @@ class SimpleConsumer(Consumer):
# but the caller does not come back into the generator again.
# The message will be consumed but the status will not be
# updated in the consumer
+ self.fetch_started[partition] = True
self.offsets[partition] = message.offset
yield message
if next_offset is None: