diff options
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..5cd15b5 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -16,7 +16,9 @@ except ImportError: # python 2 from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -98,7 +100,8 @@ class SimpleConsumer(Consumer): fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + use_latest_offsets=True): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +120,26 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.use_latest_offsets = use_latest_offsets self.queue = Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.use_latest_offsets: + req = OffsetRequest(self.topic, partition, LATEST, 1) + else: + req = OffsetRequest(self.topic, partition, EARLIEST, 1) + + resp = self.client.send_offset_request(req) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -286,10 +303,25 @@ class SimpleConsumer(Consumer): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + continue + partition = resp.partition buffer_size = partitions[partition] try: |