summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-03-29 17:24:56 -0700
committerDana Powers <dana.powers@rd.io>2015-03-30 15:02:44 -0700
commitb6d032cc3f1b53a6d5b395f9b14de62f547c8f1c (patch)
tree2111012432d8f23991068d3c6ea438da8fe10d2e /kafka/consumer/base.py
parent32dd817aac4130a019339afac7ef52f2b9b7acd4 (diff)
downloadkafka-python-b6d032cc3f1b53a6d5b395f9b14de62f547c8f1c.tar.gz
Fetch previously committed offsets in base consumer class so long as
a group is configured (but document that group must be None for old servers). This fixes multiprocessor consumer issue that prevented access to commit offsets if auto_commit is disabled. Also refactor fetch_last_known_offsets based on KafkaConsumer While still setting unknown offsets to 0
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py37
1 files changed, 25 insertions, 12 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index bde3c1a..91ad82f 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -7,7 +7,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, check_error
)
from kafka.util import ReentrantTimer
@@ -68,29 +68,42 @@ class Consumer(object):
self.commit)
self.commit_timer.start()
- if auto_commit:
+ # Set initial offsets
+ if self.group is not None:
self.fetch_last_known_offsets(partitions)
else:
for partition in partitions:
self.offsets[partition] = 0
+
def fetch_last_known_offsets(self, partitions=None):
+ if self.group is None:
+ raise ValueError('KafkaClient.group must not be None')
+
if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset(resp):
+ for partition in partitions:
+ (resp,) = self.client.send_offset_fetch_request(
+ self.group,
+ [OffsetFetchRequest(self.topic, partition)],
+ fail_on_error=False
+ )
try:
- kafka.common.check_error(resp)
- return resp.offset
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
- return 0
+ pass
- for partition in partitions:
- req = OffsetFetchRequest(self.topic, partition)
- (resp,) = self.client.send_offset_fetch_request(self.group, [req],
- fail_on_error=False)
- self.offsets[partition] = get_or_init_offset(resp)
- self.fetch_offsets = self.offsets.copy()
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self.offsets[partition] = 0
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self.offsets[partition] = resp.offset
def commit(self, partitions=None):
"""