summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
authorZack Dever <zack.dever@rd.io>2014-06-10 12:12:55 -0700
committerZack Dever <zack.dever@rd.io>2014-08-25 12:00:19 -0700
commiteecea8809ec71b889234dbd31c20e937a3de3580 (patch)
tree3715cbbf0bc76804fcd919cb3b85acbb609b2ae4 /kafka/consumer.py
parentd73d1690b16ea86a9fd51056f4d149f54a4dd8f0 (diff)
downloadkafka-python-eecea8809ec71b889234dbd31c20e937a3de3580.tar.gz
Move fetching last known offset logic to a stand alone function.
The `Consumer` class fetches the last known offsets in `__init__` if `auto_commit` is enabled, but it would be nice to expose this behavior for consumers that aren't using auto_commit. This doesn't change existing behavior, just exposes the ability to easily fetch and set the last known offsets. Once #162 or something similar lands this may no longer be necessary, but it looks like that might take a while to make it through.
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py26
1 files changed, 16 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index ef8fbda..65b2bd6 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -100,6 +100,16 @@ class Consumer(object):
self.commit)
self.commit_timer.start()
+ if auto_commit:
+ self.fetch_last_known_offsets(partitions)
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
+
+ def fetch_last_known_offsets(self, partitions=None):
+ if not partitions:
+ partitions = self.client.topic_partitions[self.topic]
+
def get_or_init_offset_callback(resp):
try:
kafka.common.check_error(resp)
@@ -107,16 +117,12 @@ class Consumer(object):
except kafka.common.UnknownTopicOrPartitionError:
return 0
- if auto_commit:
- for partition in partitions:
- req = OffsetFetchRequest(topic, partition)
- (offset,) = self.client.send_offset_fetch_request(group, [req],
- callback=get_or_init_offset_callback,
- fail_on_error=False)
- self.offsets[partition] = offset
- else:
- for partition in partitions:
- self.offsets[partition] = 0
+ for partition in partitions:
+ req = OffsetFetchRequest(self.topic, partition)
+ (offset,) = self.client.send_offset_fetch_request(self.group, [req],
+ callback=get_or_init_offset_callback,
+ fail_on_error=False)
+ self.offsets[partition] = offset
def commit(self, partitions=None):
"""