diff options
-rw-r--r-- | kafka/consumer.py | 5 |
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 6ceea72..4c64cf2 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -1,3 +1,4 @@ +from collections import defaultdict from itertools import izip_longest, repeat import logging import time @@ -218,6 +219,7 @@ class SimpleConsumer(Consumer): self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = FETCH_MIN_BYTES + self.fetch_started = defaultdict(bool) # defaults to false super(SimpleConsumer, self).__init__(client, group, topic, partitions=partitions, @@ -348,7 +350,7 @@ class SimpleConsumer(Consumer): # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is # problematic, since 0 is offset of a message which we have not yet # consumed. - if offset != 0: + if self.fetch_started[partition]: offset += 1 while True: @@ -372,6 +374,7 @@ class SimpleConsumer(Consumer): # but the caller does not come back into the generator again. # The message will be consumed but the status will not be # updated in the consumer + self.fetch_started[partition] = True self.offsets[partition] = message.offset yield message if next_offset is None: |