diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-03-30 17:42:51 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-03-30 17:42:51 -0700 |
commit | 9fd08119170b64c56ea024d12ef6b0e6482d778b (patch) | |
tree | 5fb240b11a7f9e7a5ca9d2c348556967b0f94b0c /kafka/consumer/base.py | |
parent | 6fc6856746c9e27d4c96b47b1941b6ebbabcb33b (diff) | |
parent | 1d252bfc20c8b1058dc93a495c3bdb0f4ccdf590 (diff) | |
download | kafka-python-9fd08119170b64c56ea024d12ef6b0e6482d778b.tar.gz |
Merge pull request #356 from dpkp/always_fetch_offsets
fetch commit offsets in base consumer unless group is None
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r-- | kafka/consumer/base.py | 38 |
1 files changed, 26 insertions, 12 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index bde3c1a..0bbf46c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, check_error ) from kafka.util import ReentrantTimer @@ -68,29 +68,43 @@ class Consumer(object): self.commit) self.commit_timer.start() - if auto_commit: + # Set initial offsets + if self.group is not None: self.fetch_last_known_offsets(partitions) else: for partition in partitions: self.offsets[partition] = 0 + def fetch_last_known_offsets(self, partitions=None): + if self.group is None: + raise ValueError('KafkaClient.group must not be None') + if not partitions: partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset(resp): + responses = self.client.send_offset_fetch_request( + self.group, + [OffsetFetchRequest(self.topic, p) for p in partitions], + fail_on_error=False + ) + + for resp in responses: try: - kafka.common.check_error(resp) - return resp.offset + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... except UnknownTopicOrPartitionError: - return 0 + pass - for partition in partitions: - req = OffsetFetchRequest(self.topic, partition) - (resp,) = self.client.send_offset_fetch_request(self.group, [req], - fail_on_error=False) - self.offsets[partition] = get_or_init_offset(resp) - self.fetch_offsets = self.offsets.copy() + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self.offsets[resp.partition] = 0 + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): """ |