diff options
author | Enrico Canzonieri <ecanzonieri@gmail.com> | 2015-01-06 17:33:21 +0100 |
---|---|---|
committer | Enrico Canzonieri <enrico@yelp.com> | 2015-01-14 15:13:33 -0800 |
commit | 878e4cefb696547d76f2eb0ad5b7afd2b1cfe044 (patch) | |
tree | 7ad6a258d4dcf0d59bcc532cb09420416087bd99 /kafka/consumer/simple.py | |
parent | 02c2b469003e2ddcb051dbb4d95977137050c19f (diff) | |
download | kafka-python-878e4cefb696547d76f2eb0ad5b7afd2b1cfe044.tar.gz |
Implement offsets reset when OffsetOutOfRangeError
This slightly changes the SimpleConsumer interface adding the default
option use_latest_offsets. The fetch behaviour is also changed since it
does not raise OffsetOutOfRangeError anymore. Resetting the offsets
automatically is especially useful in MultiprocessConsumer, where an
explicit seek call is not possible.
Diffstat (limited to 'kafka/consumer/simple.py')
-rw-r--r-- | kafka/consumer/simple.py | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index df975f4..5cd15b5 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -16,7 +16,9 @@ except ImportError: # python 2 from kafka.common import ( FetchRequest, OffsetRequest, - ConsumerFetchSizeTooSmall, ConsumerNoMoreData + ConsumerFetchSizeTooSmall, ConsumerNoMoreData, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, + OffsetOutOfRangeError, check_error ) from .base import ( Consumer, @@ -98,7 +100,8 @@ class SimpleConsumer(Consumer): fetch_size_bytes=FETCH_MIN_BYTES, buffer_size=FETCH_BUFFER_SIZE_BYTES, max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None): + iter_timeout=None, + use_latest_offsets=True): super(SimpleConsumer, self).__init__( client, group, topic, partitions=partitions, @@ -117,12 +120,26 @@ class SimpleConsumer(Consumer): self.fetch_min_bytes = fetch_size_bytes self.fetch_offsets = self.offsets.copy() self.iter_timeout = iter_timeout + self.use_latest_offsets = use_latest_offsets self.queue = Queue() def __repr__(self): return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ (self.group, self.topic, str(self.offsets.keys())) + def reset_partition_offset(self, partition): + LATEST = -1 + EARLIEST = -2 + if self.use_latest_offsets: + req = OffsetRequest(self.topic, partition, LATEST, 1) + else: + req = OffsetRequest(self.topic, partition, EARLIEST, 1) + + resp = self.client.send_offset_request(req) + check_error(resp) + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + def provide_partition_info(self): """ Indicates that partition info must be returned by the consumer @@ -286,10 +303,25 @@ class SimpleConsumer(Consumer): responses = self.client.send_fetch_request( requests, max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes) + min_bytes=self.fetch_min_bytes, + fail_on_error=False + ) retry_partitions = {} for resp in responses: + + try: + check_error(resp) + except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): + self.client.reset_topic_metadata(resp.topic) + raise + except OffsetOutOfRangeError: + log.warning("OffsetOutOfRangeError for %s - %d. " + "Resetting partition offset...", + resp.topic, resp.partition) + self.reset_partition_offset(resp.partition) + continue + partition = resp.partition buffer_size = partitions[partition] try: |