summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py26
1 files changed, 16 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index ef8fbda..65b2bd6 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -100,6 +100,16 @@ class Consumer(object):
self.commit)
self.commit_timer.start()
+ if auto_commit:
+ self.fetch_last_known_offsets(partitions)
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
+
+ def fetch_last_known_offsets(self, partitions=None):
+ if not partitions:
+ partitions = self.client.topic_partitions[self.topic]
+
def get_or_init_offset_callback(resp):
try:
kafka.common.check_error(resp)
@@ -107,16 +117,12 @@ class Consumer(object):
except kafka.common.UnknownTopicOrPartitionError:
return 0
- if auto_commit:
- for partition in partitions:
- req = OffsetFetchRequest(topic, partition)
- (offset,) = self.client.send_offset_fetch_request(group, [req],
- callback=get_or_init_offset_callback,
- fail_on_error=False)
- self.offsets[partition] = offset
- else:
- for partition in partitions:
- self.offsets[partition] = 0
+ for partition in partitions:
+ req = OffsetFetchRequest(self.topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(self.group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
def commit(self, partitions=None):
"""