summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/simple.py57
-rw-r--r--test/test_consumer_integration.py44
2 files changed, 97 insertions, 4 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index 000fcd9..4c835fe 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -8,6 +8,7 @@ import logging
import time
import six
+import sys
try:
from Queue import Empty, Queue
@@ -16,7 +17,9 @@ except ImportError: # python 2
from kafka.common import (
FetchRequest, OffsetRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError,
+ OffsetOutOfRangeError, check_error
)
from .base import (
Consumer,
@@ -94,6 +97,10 @@ class SimpleConsumer(Consumer):
message in the iterator before exiting. None means no
timeout, so it will wait forever.
+ auto_offset_reset: default largest. Reset partition offsets upon
+ OffsetOutOfRangeError. Valid values are largest and smallest.
+ Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
+
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
@@ -106,7 +113,8 @@ class SimpleConsumer(Consumer):
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None):
+ iter_timeout=None,
+ auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -125,12 +133,38 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
+ self.auto_offset_reset = auto_offset_reset
self.queue = Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
+ def reset_partition_offset(self, partition):
+ LATEST = -1
+ EARLIEST = -2
+ if self.auto_offset_reset == 'largest':
+ reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
+ elif self.auto_offset_reset == 'smallest':
+ reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
+ else:
+ # Let's raise an reasonable exception type if user calls
+ # outside of an exception context
+ if sys.exc_info() == (None, None, None):
+ raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
+ 'valid auto_offset_reset setting '
+ '(largest|smallest)')
+ # Otherwise we should re-raise the upstream exception
+ # b/c it typically includes additional data about
+ # the request that triggered it, and we do not want to drop that
+ raise
+
+ # send_offset_request
+ (resp, ) = self.client.send_offset_request(reqs)
+ check_error(resp)
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -297,10 +331,27 @@ class SimpleConsumer(Consumer):
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
+ min_bytes=self.fetch_min_bytes,
+ fail_on_error=False
+ )
retry_partitions = {}
for resp in responses:
+
+ try:
+ check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ self.client.reset_topic_metadata(resp.topic)
+ raise
+ except OffsetOutOfRangeError:
+ log.warning("OffsetOutOfRangeError for %s - %d. "
+ "Resetting partition offset...",
+ resp.topic, resp.partition)
+ self.reset_partition_offset(resp.partition)
+ # Retry this partition
+ retry_partitions[resp.partition] = partitions[resp.partition]
+ continue
+
partition = resp.partition
buffer_size = partitions[partition]
try:
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 4723220..9c89190 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -5,7 +5,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, KafkaConsumer, create_message
from kafka.common import (
- ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout
+ ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout, OffsetOutOfRangeError
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -85,6 +85,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
+ @kafka_versions('all')
+ def test_simple_consumer_smallest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ consumer = self.consumer(auto_offset_reset='smallest')
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to smallest we should read all 200
+ # messages from beginning.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_largest_offset_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer()
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ # Since auto_offset_reset is set to largest we should not read any
+ # messages.
+ self.assert_message_count([message for message in consumer], 0)
+ # Send 200 new messages to the queue
+ self.send_messages(0, range(200, 300))
+ self.send_messages(1, range(300, 400))
+ # Since the offset is set to largest we should read all the new messages.
+ self.assert_message_count([message for message in consumer], 200)
+
+ @kafka_versions('all')
+ def test_simple_consumer_no_reset(self):
+ self.send_messages(0, range(0, 100))
+ self.send_messages(1, range(100, 200))
+
+ # Default largest
+ consumer = self.consumer(auto_offset_reset=None)
+ # Move fetch offset ahead of 300 message (out of range)
+ consumer.seek(300, 2)
+ with self.assertRaises(OffsetOutOfRangeError):
+ consumer.get_message()
+
@kafka_versions("all")
def test_simple_consumer__seek(self):
self.send_messages(0, range(0, 100))