summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/base.py37
-rw-r--r--kafka/consumer/multiprocess.py2
-rw-r--r--kafka/consumer/simple.py2
-rw-r--r--test/test_consumer_integration.py5
-rw-r--r--test/test_failover_integration.py2
5 files changed, 34 insertions, 14 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index bde3c1a..91ad82f 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -7,7 +7,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, check_error
)
from kafka.util import ReentrantTimer
@@ -68,29 +68,42 @@ class Consumer(object):
self.commit)
self.commit_timer.start()
- if auto_commit:
+ # Set initial offsets
+ if self.group is not None:
self.fetch_last_known_offsets(partitions)
else:
for partition in partitions:
self.offsets[partition] = 0
+
def fetch_last_known_offsets(self, partitions=None):
+ if self.group is None:
+ raise ValueError('KafkaClient.group must not be None')
+
if not partitions:
partitions = self.client.get_partition_ids_for_topic(self.topic)
- def get_or_init_offset(resp):
+ for partition in partitions:
+ (resp,) = self.client.send_offset_fetch_request(
+ self.group,
+ [OffsetFetchRequest(self.topic, partition)],
+ fail_on_error=False
+ )
try:
- kafka.common.check_error(resp)
- return resp.offset
+ check_error(resp)
+ # API spec says server wont set an error here
+ # but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
- return 0
+ pass
- for partition in partitions:
- req = OffsetFetchRequest(self.topic, partition)
- (resp,) = self.client.send_offset_fetch_request(self.group, [req],
- fail_on_error=False)
- self.offsets[partition] = get_or_init_offset(resp)
- self.fetch_offsets = self.offsets.copy()
+ # -1 offset signals no commit is currently stored
+ if resp.offset == -1:
+ self.offsets[partition] = 0
+
+ # Otherwise we committed the stored offset
+ # and need to fetch the next one
+ else:
+ self.offsets[partition] = resp.offset
def commit(self, partitions=None):
"""
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index 5ce8b4d..3acd470 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -93,6 +93,8 @@ class MultiProcessConsumer(Consumer):
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
+ If you are connecting to a server that does not support offset
+ commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index b50de61..ae00dab 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -73,6 +73,8 @@ class SimpleConsumer(Consumer):
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
+ If you are connecting to a server that does not support offset
+ commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index fd62d9b..403ce0f 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def consumer(self, **kwargs):
if os.environ['KAFKA_VERSION'] == "0.8.0":
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
+ kwargs['group'] = None
kwargs['auto_commit'] = False
else:
kwargs.setdefault('auto_commit', True)
@@ -260,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(0, range(0, 10))
self.send_messages(1, range(10, 20))
- consumer = MultiProcessConsumer(self.client, "group1", self.topic,
+ # set group to None and auto_commit to False to avoid interactions w/
+ # offset commit/fetch apis
+ consumer = MultiProcessConsumer(self.client, None, self.topic,
auto_commit=False, iter_timeout=0)
self.assertEqual(consumer.pending(), 20)
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 7d27526..15f0338 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestCase):
client = KafkaClient(hosts)
group = random_string(10)
- consumer = SimpleConsumer(client, group, topic,
+ consumer = SimpleConsumer(client, None, topic,
partitions=partitions,
auto_commit=False,
iter_timeout=timeout)