summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-08-25 13:22:12 -0700
committerDana Powers <dana.powers@gmail.com>2014-08-25 13:22:12 -0700
commita1603619d42066f446eed61973ab48dea9a026db (patch)
tree40f3d26f19961a48b9a94638126d19396a874364 /kafka/consumer.py
parentd73d1690b16ea86a9fd51056f4d149f54a4dd8f0 (diff)
parent02007b0e80ed4f6ec6fd23404bde7c8706201330 (diff)
downloadkafka-python-a1603619d42066f446eed61973ab48dea9a026db.tar.gz
Merge pull request #177 from zever/fetch-last-known-offsets
Move fetching last known offset logic to a stand alone function.
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py27
1 files changed, 17 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index ef8fbda..bff54ab 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -100,6 +100,16 @@ class Consumer(object):
self.commit)
self.commit_timer.start()
+ if auto_commit:
+ self.fetch_last_known_offsets(partitions)
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
+
+ def fetch_last_known_offsets(self, partitions=None):
+ if not partitions:
+ partitions = self.client.topic_partitions[self.topic]
+
def get_or_init_offset_callback(resp):
try:
kafka.common.check_error(resp)
@@ -107,16 +117,13 @@ class Consumer(object):
except kafka.common.UnknownTopicOrPartitionError:
return 0
- if auto_commit:
- for partition in partitions:
- req = OffsetFetchRequest(topic, partition)
- (offset,) = self.client.send_offset_fetch_request(group, [req],
- callback=get_or_init_offset_callback,
- fail_on_error=False)
- self.offsets[partition] = offset
- else:
- for partition in partitions:
- self.offsets[partition] = 0
+ for partition in partitions:
+ req = OffsetFetchRequest(self.topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(self.group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
+ self.fetch_offsets = self.offsets.copy()
def commit(self, partitions=None):
"""