diff options
author | Zack Dever <zack.dever@rd.io> | 2014-06-10 12:12:55 -0700 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2014-08-25 12:00:19 -0700 |
commit | eecea8809ec71b889234dbd31c20e937a3de3580 (patch) | |
tree | 3715cbbf0bc76804fcd919cb3b85acbb609b2ae4 | |
parent | d73d1690b16ea86a9fd51056f4d149f54a4dd8f0 (diff) | |
download | kafka-python-eecea8809ec71b889234dbd31c20e937a3de3580.tar.gz |
Move fetching last known offset logic to a stand alone function.
The `Consumer` class fetches the last known offsets in `__init__` if
`auto_commit` is enabled, but it would be nice to expose this behavior
for consumers that aren't using auto_commit. This doesn't change
existing behavior, just exposes the ability to easily fetch and set the
last known offsets. Once #162 or something similar lands this may no
longer be necessary, but it looks like that might take a while to make
it through.
-rw-r--r-- | kafka/consumer.py | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index ef8fbda..65b2bd6 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -100,6 +100,16 @@ class Consumer(object): self.commit) self.commit_timer.start() + if auto_commit: + self.fetch_last_known_offsets(partitions) + else: + for partition in partitions: + self.offsets[partition] = 0 + + def fetch_last_known_offsets(self, partitions=None): + if not partitions: + partitions = self.client.topic_partitions[self.topic] + def get_or_init_offset_callback(resp): try: kafka.common.check_error(resp) @@ -107,16 +117,12 @@ class Consumer(object): except kafka.common.UnknownTopicOrPartitionError: return 0 - if auto_commit: - for partition in partitions: - req = OffsetFetchRequest(topic, partition) - (offset,) = self.client.send_offset_fetch_request(group, [req], - callback=get_or_init_offset_callback, - fail_on_error=False) - self.offsets[partition] = offset - else: - for partition in partitions: - self.offsets[partition] = 0 + for partition in partitions: + req = OffsetFetchRequest(self.topic, partition) + (offset,) = self.client.send_offset_fetch_request(self.group, [req], + callback=get_or_init_offset_callback, + fail_on_error=False) + self.offsets[partition] = offset def commit(self, partitions=None): """ |