summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
authorEnrico Canzonieri <ecanzonieri@gmail.com>2015-01-06 17:33:21 +0100
committerEnrico Canzonieri <enrico@yelp.com>2015-01-14 15:13:33 -0800
commit878e4cefb696547d76f2eb0ad5b7afd2b1cfe044 (patch)
tree7ad6a258d4dcf0d59bcc532cb09420416087bd99 /kafka/consumer/simple.py
parent02c2b469003e2ddcb051dbb4d95977137050c19f (diff)
downloadkafka-python-878e4cefb696547d76f2eb0ad5b7afd2b1cfe044.tar.gz
Implement offsets reset when OffsetOutOfRangeError
This slightly changes the SimpleConsumer interface adding the default option use_latest_offsets. The fetch behaviour is also changed since it does not raise OffsetOutOfRangeError anymore. Resetting the offsets automatically is especially useful in MultiprocessConsumer, where an explicit seek call is not possible.
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py38
1 files changed, 35 insertions, 3 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index df975f4..5cd15b5 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -16,7 +16,9 @@ except ImportError: # python 2
from kafka.common import (
FetchRequest, OffsetRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError,
+ OffsetOutOfRangeError, check_error
)
from .base import (
Consumer,
@@ -98,7 +100,8 @@ class SimpleConsumer(Consumer):
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None):
+ iter_timeout=None,
+ use_latest_offsets=True):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -117,12 +120,26 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
+ self.use_latest_offsets = use_latest_offsets
self.queue = Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
+ def reset_partition_offset(self, partition):
+ LATEST = -1
+ EARLIEST = -2
+ if self.use_latest_offsets:
+ req = OffsetRequest(self.topic, partition, LATEST, 1)
+ else:
+ req = OffsetRequest(self.topic, partition, EARLIEST, 1)
+
+ resp = self.client.send_offset_request(req)
+ check_error(resp)
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -286,10 +303,25 @@ class SimpleConsumer(Consumer):
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
+ min_bytes=self.fetch_min_bytes,
+ fail_on_error=False
+ )
retry_partitions = {}
for resp in responses:
+
+ try:
+ check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ self.client.reset_topic_metadata(resp.topic)
+ raise
+ except OffsetOutOfRangeError:
+ log.warning("OffsetOutOfRangeError for %s - %d. "
+ "Resetting partition offset...",
+ resp.topic, resp.partition)
+ self.reset_partition_offset(resp.partition)
+ continue
+
partition = resp.partition
buffer_size = partitions[partition]
try: