summaryrefslogtreecommitdiff
path: root/kafka/consumer/simple.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r--kafka/consumer/simple.py38
1 files changed, 35 insertions, 3 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index df975f4..5cd15b5 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -16,7 +16,9 @@ except ImportError: # python 2
from kafka.common import (
FetchRequest, OffsetRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
+ UnknownTopicOrPartitionError, NotLeaderForPartitionError,
+ OffsetOutOfRangeError, check_error
)
from .base import (
Consumer,
@@ -98,7 +100,8 @@ class SimpleConsumer(Consumer):
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None):
+ iter_timeout=None,
+ use_latest_offsets=True):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
@@ -117,12 +120,26 @@ class SimpleConsumer(Consumer):
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
+ self.use_latest_offsets = use_latest_offsets
self.queue = Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
+ def reset_partition_offset(self, partition):
+ LATEST = -1
+ EARLIEST = -2
+ if self.use_latest_offsets:
+ req = OffsetRequest(self.topic, partition, LATEST, 1)
+ else:
+ req = OffsetRequest(self.topic, partition, EARLIEST, 1)
+
+ resp = self.client.send_offset_request(req)
+ check_error(resp)
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+
def provide_partition_info(self):
"""
Indicates that partition info must be returned by the consumer
@@ -286,10 +303,25 @@ class SimpleConsumer(Consumer):
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
+ min_bytes=self.fetch_min_bytes,
+ fail_on_error=False
+ )
retry_partitions = {}
for resp in responses:
+
+ try:
+ check_error(resp)
+ except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
+ self.client.reset_topic_metadata(resp.topic)
+ raise
+ except OffsetOutOfRangeError:
+ log.warning("OffsetOutOfRangeError for %s - %d. "
+ "Resetting partition offset...",
+ resp.topic, resp.partition)
+ self.reset_partition_offset(resp.partition)
+ continue
+
partition = resp.partition
buffer_size = partitions[partition]
try: